源码分析-Fabric 1.4.2 lscc启动用户链码的过程

Nysa ·
更新时间:2024-09-20
· 983 次阅读

源码分析-Fabric 1.4.2 lscc启动用户链码的过程

lscc负责管理链码的生命周期,其就是说链码容器的启动应该是lscc触发的。但是链码容器是如何从peer chaincode instantiate 触发实例化命令到peer服务完成链码的实例化,即链码容器的启动,看了一些网上的资料机会没有讲这块的。也因为又这块二次开发的需求,所以追踪了一下Fabric源码从lscc到用户链码容器启动的全过程,在此记录一下。

客户端peer chaincode instantiate部分

fabric/peer/chaincode/instantiate.go

func instantiate(cmd *cobra.Command, cf *ChaincodeCmdFactory) (*protcommon.Envelope, error) { ... // instantiate is currently only supported for one peer proposalResponse, err := cf.EndorserClients[0].ProcessProposal(context.Background(), signedProp /* EndorserClients[0]就是这个peer本身(但是这是在客户端里呀,配置中设置好的地址,详细看cli的配置 CORE_PEER_ADDRESS=peer0.org1.example.com:7051) ProcessProposal是grpc定义的函数,可以在peer.pb.go中看到定义的EndorserClient和EndorserServern内容,这里是客户端调用ProcessProposal把相关内容发送给Peer Server Peer chaincode instantiate是客户端,但客户端的endorseClient在哪里处理的?InitCmdFactory中 */ ... }

ProcessProposal可以定位到grpc的定义,这是双方交互的接口
fabric/protos/peer/peer.go

message PeerID { string name = 1; } message PeerEndpoint { PeerID id = 1; string address = 2; } service Endorser { rpc ProcessProposal(SignedProposal) returns (ProposalResponse) {} }

接下来,看下Endorser客户端是如何处理的,如何请求服务的。

先找到cf.EndorserClients是在哪里实例化的,定位到InitCmdFactory
fabric/peer/chaincode/common.go

// InitCmdFactory init the ChaincodeCmdFactory with default clients func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool) (*ChaincodeCmdFactory, error) { var err error var endorserClients []pb.EndorserClient var deliverClients []api.PeerDeliverClient if isEndorserRequired { if err = validatePeerConnectionParameters(cmdName); err != nil { return nil, errors.WithMessage(err, "error validating peer connection parameters") } for i, address := range peerAddresses { var tlsRootCertFile string if tlsRootCertFiles != nil { tlsRootCertFile = tlsRootCertFiles[i] } //找到EndorserClient实例化的代码 endorserClient, err := common.GetEndorserClientFnc(address, tlsRootCertFile) //GetEndorserClientFnc的实例是在common/common.go/init()中构建 if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error getting endorser client for %s", cmdName)) } endorserClients = append(endorserClients, endorserClient) deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("error getting deliver client for %s", cmdName)) } deliverClients = append(deliverClients, deliverClient) } if len(endorserClients) == 0 { return nil, errors.New("no endorser clients retrieved - this might indicate a bug") } } ... return &ChaincodeCmdFactory{ EndorserClients: endorserClients, DeliverClients: deliverClients, Signer: signer, BroadcastClient: broadcastClient, Certificate: certificate, }, nil }

通过GetEndorserClientFnc找到func GetEndorserClient(address, tlsRootCertFile string) (pb.EndorserClient, error)
fabric/peer/common/peerclient.go

// GetEndorserClient returns a new endorser client. If the both the address and // tlsRootCertFile are not provided, the target values for the client are taken // from the configuration settings for "peer.address" and // "peer.tls.rootcert.file" func GetEndorserClient(address, tlsRootCertFile string) (pb.EndorserClient, error) { var peerClient *PeerClient var err error if address != "" { peerClient, err = NewPeerClientForAddress(address, tlsRootCertFile) } else { peerClient, err = NewPeerClientFromEnv() } if err != nil { return nil, err } //又引出peerClient return peerClient.Endorser() }

定位到peerclient.go,发现peerClient的许多方法,对应这不同的grpc客户端,这里是fabric中grpc客户端封装处理的地方,Endorser grpc客户端生成的方法func (pc *PeerClient) Endorser() (pb.EndorserClient, error)
endorser grpc client实例,fabric/peer/common/peerclient.go

// Endorser returns a client for the Endorser service func (pc *PeerClient) Endorser() (pb.EndorserClient, error) { conn, err := pc.commonClient.NewConnection(pc.address, pc.sn) if err != nil { return nil, errors.WithMessage(err, fmt.Sprintf("endorser client failed to connect to %s", pc.address)) } return pb.NewEndorserClient(conn), nil }

到此,找到了endorser grpc客户端实例的创建,以及grpc的请求,调用接口ProcessProposal
我们看到实例化是启动了背书的grpc,实际上其他交易也是启动背书过程,这是fabric,区块链的特性,大多数的处理都会在背书上,所以待会我们主要看服务端的背书处理上

peer chaincode instantiate 链码实例化客户端部分已经完成

peer chaincode instantiate 链码实例化server端

服务端首先处理的是lscc的调用处理,就是lscc Invoke的地方,但如果继续看下去,就会发现,这里只是将作为参数的用户链码,存了起来,也就是说,将链码存储的是这里实现的。

fabric/core/scc/lscc/lscc.go/Invoke

// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade". // Deploy's arguments - {[]byte("deploy"), []byte(), } // // Invoke also implements some query-like functions // Get chaincode arguments - {[]byte("getid"), []byte(), []byte()} func (lscc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response { args := stub.GetArgs() if len(args) < 1 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } function := string(args[0]) // Handle ACL: // 1. get the signed proposal sp, err := stub.GetSignedProposal() if err != nil { return shim.Error(fmt.Sprintf("Failed retrieving signed proposal on executing %s with error %s", function, err)) } switch function { case INSTALL: ... case DEPLOY, UPGRADE: // we expect a minimum of 3 arguments, the function // name, the chain name and deployment spec if len(args) 6 { return shim.Error(PrivateChannelDataNotAvailable("").Error()) } if ac.Capabilities().PrivateChannelData() && len(args) > 7 { return shim.Error(InvalidArgsLenErr(len(args)).Error()) } depSpec := args[2] cds := &pb.ChaincodeDeploymentSpec{} err := proto.Unmarshal(depSpec, cds) if err != nil { return shim.Error(fmt.Sprintf("error unmarshaling ChaincodeDeploymentSpec: %s", err)) } // optional arguments here (they can each be nil and may or may not be present) // args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy // args[4] is the name of escc // args[5] is the name of vscc // args[6] is a marshalled CollectionConfigPackage struct var EP []byte if len(args) > 3 && len(args[3]) > 0 { EP = args[3] } else { p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel)) EP, err = utils.Marshal(p) if err != nil { return shim.Error(err.Error()) } } var escc []byte if len(args) > 4 && len(args[4]) > 0 { escc = args[4] } else { escc = []byte("escc") } var vscc []byte if len(args) > 5 && len(args[5]) > 0 { vscc = args[5] } else { vscc = []byte("vscc") } var collectionsConfig []byte // we proceed with a non-nil collection configuration only if // we Support the PrivateChannelData capability if ac.Capabilities().PrivateChannelData() && len(args) > 6 { collectionsConfig = args[6] } cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function) if err != nil { return shim.Error(err.Error()) } cdbytes, err := proto.Marshal(cd) if err != nil { return shim.Error(err.Error()) } return shim.Success(cdbytes) ... } return shim.Error(InvalidFunctionErr(function).Error()) }

最后完成lscc的处理也是需要背书、模拟执行的,所以我们关心的链码容器启动不是在lscc的invoke上的,我们还是看背书的服务端吧。

peer服务端都是在peer node start中完成启动的,先找endorser
peer node start中启动endorser grpc server: fabric/peer/chaincode/node/start.go/serve

pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{ ChannelStateRetriever: channelStateRetriever, TransientStoreRetriever: peer.TransientStoreFactory, PluginMapper: pluginMapper, SigningIdentityFetcher: signingIdentityFetcher, }) endorserSupport.PluginEndorser = pluginEndorser serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider) ... auth := authHandler.ChainFilters(serverEndorser, authFilters...) ...

通过NewEndorserServer定位到Endorser的server处理端ProcessProposal,从这里,我们将看到链码启动的完成过程,lscc启动链码的全过程

fabric/core/endorser/endorser.go

// ProcessProposal process the Proposal func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) { ... // 0 -- check and validate vr, err := e.preProcess(signedProp) if err != nil { resp := vr.resp return resp, err } prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid // obtaining once the tx simulator for this proposal. This will be nil // for chainless proposals // Also obtain a history query executor for history queries, since tx simulator does not cover history var txsim ledger.TxSimulator var historyQueryExecutor ledger.HistoryQueryExecutor if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) { if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } // txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit // of valid write-sets to the stateDB), we must release the lock as early as possible. // Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and // rwset is collected before gossip dissemination if required for privateData. For safety, we // add the following defer statement and is useful when an error occur. Note that calling // txsim.Done() more than once does not cause any issue. If the txsim is already // released, the following txsim.Done() simply returns. defer txsim.Done() if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } } txParams := &ccprovider.TransactionParams{ ChannelID: chainID, TxID: txid, SignedProp: signedProp, Proposal: prop, TXSimulator: txsim, HistoryQueryExecutor: historyQueryExecutor, } // this could be a request to a chainless SysCC // TODO: if the proposal has an extension, it will be of type ChaincodeAction; // if it's present it means that no simulation is to be performed because // we're trying to emulate a submitting peer. On the other hand, we need // to validate the supplied action before endorsing it // 1 -- simulate //进入simulateproposal,我们可以看到,交易执行的过程,并且将遇到一个重要的处理函数callChaincode cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } if res != nil { if res.Status >= shim.ERROR { endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid) var cceventBytes []byte if ccevent != nil { cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent) if err != nil { return nil, errors.Wrap(err, "failed to marshal event bytes") } } pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } return pResp, nil } } // 2 -- endorse and get a marshalled ProposalResponse message ,这里在那时不关心 var pResp *pb.ProposalResponse // TODO till we implement global ESCC, CSCC for system chaincodes // chainless proposals (such as CSCC) don't have to be endorsed if chainID == "" { pResp = &pb.ProposalResponse{Response: res} } else { // Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd) // if error, capture endorsement failure metric meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, } if err != nil { meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(false)) e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1) return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } if pResp.Response.Status >= shim.ERRORTHRESHOLD { // the default ESCC treats all status codes about threshold as errors and fails endorsement // useful to track this as a separate metric meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(true)) e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1) endorserLogger.Debugf("[%s][%s] endorseProposal() resulted in chaincode %s error for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, txid) return pResp, nil } } // Set the proposal response payload - it // contains the "return value" from the // chaincode invocation pResp.Response = res // total failed proposals = ProposalsReceived-SuccessfulProposals e.Metrics.SuccessfulProposals.Add(1) success = true return pResp, nil }

通过e.SimulateProposal(txParams, hdrExt.ChaincodeId),找到执行处理的代码callChaincode
fabric/core/endorser/endorser.go

// SimulateProposal simulates the proposal by calling the chaincode func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) { // we do expect the payload to be a ChaincodeInvocationSpec // if we are supporting other payloads in future, this be glaringly point // as something that should change cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal) if err != nil { return nil, nil, nil, nil, err } var cdLedger ccprovider.ChaincodeDefinition var version string if !e.s.IsSysCC(cid.Name) { cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator) if err != nil { return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name)) } version = cdLedger.CCVersion() err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger) if err != nil { return nil, nil, nil, nil, err } } else { version = util.GetSysCCVersion() } // ---3. execute the proposal and get simulation results var simResult *ledger.TxSimulationResults var pubSimResBytes []byte var res *pb.Response var ccevent *pb.ChaincodeEvent //找到处理核心,调用链码的部分,lscc启动mycc也是在这里 res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid) ... }

我们找到了,lscc启动用户链码的地方
fabric/core/endorser/endorser.go

// call specified chaincode (system or user) func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) { var err error var res *pb.Response var ccevent *pb.ChaincodeEvent //暂不关心 // is this a system chaincode res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input) if err != nil { return nil, nil, err } // per doc anything = 400 (ie, unambiguous errors ) // "lscc" will respond with status 200 or 500 (ie, unambiguous OK or ERROR) if res.Status >= shim.ERRORTHRESHOLD { return res, nil, nil } // ----- BEGIN - SECTION THAT MAY NEED TO BE DONE IN LSCC ------ // if this a call to deploy a chaincode, We need a mechanism // to pass TxSimulator into LSCC. Till that is worked out this // special code does the actual deploy, upgrade here so as to collect // all state under one TxSimulator // // NOTE that if there's an error all simulation, including the chaincode // table changes in lscc will be thrown away if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") { //就是这里,判断lscc,并解析lscc链码的参数,因为参数中就是链码内容 //userCDS就是我们要启动的链码 userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry) if err != nil { return nil, nil, err } var cds *pb.ChaincodeDeploymentSpec cds, err = e.SanitizeUserCDS(userCDS) if err != nil { return nil, nil, err } // this should not be a system chaincode if e.s.IsSysCC(cds.ChaincodeSpec.ChaincodeId.Name) { return nil, nil, errors.Errorf("attempting to deploy a system chaincode %s/%s", cds.ChaincodeSpec.ChaincodeId.Name, txParams.ChannelID) } //启动链码, _, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds) if err != nil { // increment the failure to indicate instantion/upgrade failures meterLabels := []string{ "channel", txParams.ChannelID, "chaincode", cds.ChaincodeSpec.ChaincodeId.Name + ":" + cds.ChaincodeSpec.ChaincodeId.Version, } e.Metrics.InitFailed.With(meterLabels...).Add(1) return nil, nil, err } //用户链码启动结束 } // ----- END ------- return res, ccevent, err }

如果分析过fabric链码容器Start的过程,那接下来的部分就不用看了,我们看lscc启动链码容器,最关键的就是找到上边这个函数处理链码启动的地方。

再定位到ExecuteLegacyInit
fabric/core/chaincode/chaincode_support.go/ExecuteLegacyInit

// ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle // is entirely deprecated. Ideally one release after the introduction of the new lifecycle. // It does not attempt to start the chaincode based on the information from lifecycle, but instead // accepts the container information directly in the form of a ChaincodeDeploymentSpec. func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) { ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec) ccci.Version = cccid.Version //启动链码的地方 err := cs.LaunchInit(ccci) if err != nil { return nil, nil, err } //注册链码,就是添加个记录 cname := ccci.Name + ":" + ccci.Version h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname) } //这里暂不关心,其实还应该看链码的初始化的部分 resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h) return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err) }

启动链码的地方LaunchInit
fabric/core/chaincode/chaincode_support.go/LaunchInit

// LaunchInit bypasses getting the chaincode spec from the LSCC table // as in the case of v1.0-v1.2 lifecycle, the chaincode will not yet be // defined in the LSCC table func (cs *ChaincodeSupport) LaunchInit(ccci *ccprovider.ChaincodeContainerInfo) error { cname := ccci.Name + ":" + ccci.Version if cs.HandlerRegistry.Handler(cname) != nil { return nil } return cs.Launcher.Launch(ccci)//启动函数 }

定位cs.Launcher.Launch(ccci)//启动函数
fabric/core/chaincode/runtime_supoort.go/Launch

func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error { var startFailCh chan error var timeoutCh <-chan time.Time startTime := time.Now() cname := ccci.Name + ":" + ccci.Version launchState, alreadyStarted := r.Registry.Launching(cname) if !alreadyStarted { ... go func() { //启动的地方 if err := r.Runtime.Start(ccci, codePackage); err != nil { startFailCh <- errors.WithMessage(err, "error starting container") return } exitCode, err := r.Runtime.Wait(ccci) if err != nil { launchState.Notify(errors.Wrap(err, "failed to wait on container exit")) } launchState.Notify(errors.Errorf("container exited with %d", exitCode)) }() } //阻塞,等待启动完成 var err error select { case <-launchState.Done(): err = errors.WithMessage(launchState.Err(), "chaincode registration failed") case err = <-startFailCh: launchState.Notify(err) r.Metrics.LaunchFailures.With("chaincode", cname).Add(1) case <-timeoutCh: err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname) launchState.Notify(err) r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1) } success := true ... }

定位Runtime.Start
fabric/core/chaincode/container_runtime.go

// Start launches chaincode in a runtime environment. func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error { cname := ccci.Name + ":" + ccci.Version //读取peer的配置文件,关于docker engine的 lc, err := c.LaunchConfig(cname, ccci.Type) if err != nil { return err } ... //这里是设置容器的启动配置 scr := container.StartContainerReq{ Builder: &container.PlatformBuilder{ Type: ccci.Type, Name: ccci.Name, Version: ccci.Version, Path: ccci.Path, CodePackage: codePackage, PlatformRegistry: c.PlatformRegistry, }, Args: lc.Args, Env: lc.Envs, FilesToUpload: lc.Files, CCID: ccintf.CCID{ Name: ccci.Name, Version: ccci.Version, }, } //启动的地方,ccci.ContainerType又两种sys和docker,用户链码是docker启动,接下来会构造docker client向docker engine请求启动容器 if err := c.Processor.Process(ccci.ContainerType, scr); err != nil { return errors.WithMessage(err, "error starting container") } return nil }

定位到Processor,是个接口,我们通过ChaincodeSupport的实例化的时候的配置,找到Processor的实例化
fabric/core/chaincode/container_runtime.go

// Processor processes vm and container requests. type Processor interface { Process(vmtype string, req container.VMCReq) error }

定位到Controller
fabric/conre/container/controller.go

func (vmc *VMController) Process(vmtype string, req VMCReq) error { v := vmc.newVM(vmtype) ccid := req.GetCCID() id := ccid.GetName() vmc.lockContainer(id) defer vmc.unlockContainer(id) return req.Do(v) }

VMCReq也是接口,这里又两个实现,一个是Start,一个是Stop,都是通过Do完成相关操作的,这里我们就看Start的
fabric/conre/container/controller.go

type VMCReq interface { Do(v VM) error GetCCID() ccintf.CCID } //StartContainerReq - properties for starting a container. type StartContainerReq struct { ccintf.CCID Builder Builder Args []string Env []string FilesToUpload map[string][]byte } func (si StartContainerReq) Do(v VM) error { //通过v启动了start,我们看VMcontroller.Proccess中v的实例 return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder) }

看实例化VM的函数
fabric/conre/container/controller.go

func (vmc *VMController) newVM(typ string) VM { //通过map得到了一个实例,我们找到最初定义chaincodeSupport的地方看实例了哪几个vm v, ok := vmc.vmProviders[typ] if !ok { vmLogger.Panicf("Programming error: unsupported VM type: %s", typ) } return v.NewVM() }

回看vm实例
fabric/peer/node/start.go/serve

chaincodeSupport := NewChaincodeSupport( config, "0.0.0.0:7052", true, ca.CertBytes(), certGenerator, &ccprovider.CCInfoFSImpl{}, lsccImpl, mockAclProvider, container.NewVMController( map[string]container.VMProvider{ //const ContainerType = "DOCKER" //第一个dockercontroller,这是我们需要的用户链码执行环境 dockercontroller.ContainerType: dockercontroller.NewProvider("", "", &disabled.Provider{}), //const ContainerType = "SYSTEM" //这就是sys的执行环境 inproccontroller.ContainerType: ipRegistry, }, ), sccp, pr, peer.DefaultSupport, &disabled.Provider{}, )

我们看实现了VM接口的controller

type VMProvider interface { NewVM() VM } //fabric/core/contaner/dockercontroller/dockercontroller.go docker // NewVM creates a new DockerVM instance func (p *Provider) NewVM() container.VM { return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics) } //fabric/core/contaner/dockercontroller/inproccontroller.go sys // NewVM creates an inproc VM instance func (r *Registry) NewVM() container.VM { return NewInprocVM(r) }

我们关心用户链码的启动,我们直接看Dockercontroller,
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder)其实就是docker cleint向docker server请求启动容器的具体过程了
fabric/core/contaner/dockercontroller/dockercontroller.go

// Start starts a container using a previously created docker image func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error { imageName, err := vm.GetVMNameForDocker(ccid) if err != nil { return err } containerName := vm.GetVMName(ccid) logger := dockerLogger.With("imageName", imageName, "containerName", containerName) //获取docker client client, err := vm.getClientFnc() ... vm.stopInternal(client, containerName, 0, false, false) ... err = vm.createContainer(client, imageName, containerName, args, env, attachStdout) ... // upload specified files to the container before starting it // this can be used for configurations such as TLS key and certs if len(filesToUpload) != 0 { // the docker upload API takes a tar file, so we need to first // consolidate the file entries to a tar payload := bytes.NewBuffer(nil) gw := gzip.NewWriter(payload) tw := tar.NewWriter(gw) for path, fileToUpload := range filesToUpload { cutil.WriteBytesToPackage(path, fileToUpload, tw) } // Write the tar file out if err := tw.Close(); err != nil { return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err) } gw.Close() ... err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{ InputStream: bytes.NewReader(payload.Bytes()), Path: "/", NoOverwriteDirNonDir: false, }) if err != nil { return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err) } } // start container with HostConfig was deprecated since v1.10 and removed in v1.2 err = client.StartContainer(containerName, nil) if err != nil { dockerLogger.Errorf("start-could not start container: %s", err) return err } dockerLogger.Debugf("Started container %s", containerName) return nil }

可以再看下getDockerClient
fabric/core/chaincode/container_runtime.go

func getDockerClient() (dockerClient, error) { return cutil.NewDockerClient() }

读取peer关于链码容器的配置,构建docker客户端
fabric/core/container/util/dokcerutil.go

func NewDockerClient() (client *docker.Client, err error) { endpoint := viper.GetString("vm.endpoint") tlsenabled := viper.GetBool("vm.docker.tls.enabled") if tlsenabled { cert := config.GetPath("vm.docker.tls.cert.file") key := config.GetPath("vm.docker.tls.key.file") ca := config.GetPath("vm.docker.tls.ca.file") client, err = docker.NewTLSClient(endpoint, cert, key, ca) } else { client, err = docker.NewClient(endpoint) } return }

至此,Peer chaincode instantiate服务端链码启动的过程我们也分析完了。这里我们主要对lscc invoke启动链码容器的过程进行了分析,并没有对lscc完整的交易流程分析,但是交易流程分析也大体是这个过程,不再描述。


作者:longtails



fabric 源码

需要 登录 后方可回复, 如果你还没有账号请 注册新账号