func(srv*Server)Serve(lnet.Listener)error{...ctx:=context.WithValue(baseCtx,ServerContextKey,srv)for{rw,err:=l.Accept()...connCtx:=ctxifcc:=srv.ConnContext;cc!=nil{// wrap ctxconnCtx=cc(connCtx,rw)ifconnCtx==nil{panic("ConnContext returned nil")}}...c:=srv.newConn(rw)c.setState(c.rwc,StateNew,runHooks)// before Serve can returngoc.serve(connCtx)}}
func(c*conn)serve(ctxcontext.Context){...ctx,cancelCtx:=context.WithCancel(ctx)c.cancelCtx=cancelCtxdefercancelCtx()c.r=&connReader{conn:c}c.bufr=newBufioReader(c.r)c.bufw=newBufioWriterSize(checkConnErrorWriter{c},4<<10)for{w,err:=c.readRequest(ctx)...// HTTP cannot have multiple simultaneous active requests.[*]// Until the server replies to this request, it can't read another,// so we might as well run the handler in this goroutine.// [*] Not strictly true: HTTP pipelining. We could let them all process// in parallel even if their responses need to be serialized.// But we're not going to implement HTTP pipelining because it// was never deployed in the wild and the answer is HTTP/2.serverHandler{c.server}.ServeHTTP(w,w.req)w.cancelCtx()...}}
// Dial connects to an RPC server at the specified network address.funcDial(network,addressstring)(*Client,error){conn,err:=net.Dial(network,address)...returnNewClient(conn),nil}
args:=&Args{7,8}reply:=new(Reply)err:=client.Call("Arith.Add",args,reply)func(client*Client)Call(serviceMethodstring,argsinterface{},replyinterface{})error{call:=<-client.Go(serviceMethod,args,reply,make(chan*Call,1)).Donereturncall.Error}func(client*Client)Go(serviceMethodstring,argsinterface{},replyinterface{},donechan*Call)*Call{call:=new(Call)call.ServiceMethod=serviceMethodcall.Args=argscall.Reply=reply...call.Done=doneclient.send(call)returncall}func(client*Client)send(call*Call){client.reqMutex.Lock()deferclient.reqMutex.Unlock()// Register this call.client.mutex.Lock()...seq:=client.seqclient.seq++// 每次请求序号自增下client.pending[seq]=call// 将call加入到 pending list里,由input()方法来消费client.mutex.Unlock()// Encode and send the request.client.request.Seq=seqclient.request.ServiceMethod=call.ServiceMethoderr:=client.codec.WriteRequest(&client.request,call.Args)....}
// Register publishes the receiver's methods in the DefaultServer.funcRegister(rcvrinterface{})error{returnDefaultServer.Register(rcvr)}func(server*Server)Register(rcvrinterface{})error{returnserver.register(rcvr,"",false)}func(server*Server)register(rcvrinterface{},namestring,useNamebool)error{s:=new(service)s.typ=reflect.TypeOf(rcvr)s.rcvr=reflect.ValueOf(rcvr)sname:=reflect.Indirect(s.rcvr).Type().Name()// 获取struct nameifuseName{sname=name}...s.name=sname// Install the methodss.method=suitableMethods(s.typ,true)// 遍历所有符合条件的Methods...if_,dup:=server.serviceMap.LoadOrStore(sname,s);dup{returnerrors.New("rpc: service already defined: "+sname)}returnnil}
varlnet.Listenerl,newServerAddr=listenTCP()log.Println("NewServer test RPC server listening on",newServerAddr)gonewServer.Accept(l)funclistenTCP()(net.Listener,string){l,e:=net.Listen("tcp","127.0.0.1:0")// any available addressife!=nil{log.Fatalf("net.Listen tcp :0: %v",e)}returnl,l.Addr().String()}func(server*Server)Accept(lisnet.Listener){for{conn,err:=lis.Accept()iferr!=nil{log.Print("rpc.Serve: accept:",err.Error())return}goserver.ServeConn(conn)}}func(server*Server)ServeConn(connio.ReadWriteCloser){buf:=bufio.NewWriter(conn)srv:=&gobServerCodec{rwc:conn,dec:gob.NewDecoder(conn),enc:gob.NewEncoder(buf),encBuf:buf,}// 主要定义好 编码server.ServeCodec(srv)}func(server*Server)ServeCodec(codecServerCodec){sending:=new(sync.Mutex)wg:=new(sync.WaitGroup)for{// 根据request 获取 service,method,req,argv,replyvservice,mtype,req,argv,replyv,keepReading,err:=server.readRequest(codec)...wg.Add(1)goservice.call(server,sending,wg,mtype,req,argv,replyv,codec)}// We've seen that there are no more requests.// Wait for responses to be sent before closing codec.wg.Wait()codec.Close()}func(server*Server)readRequest(codecServerCodec)(service*service,mtype*methodType,req*Request,argv,replyvreflect.Value,keepReadingbool,errerror){service,mtype,req,keepReading,err=server.readRequestHeader(codec)iferr!=nil{...}// Decode the argument value.argIsValue:=false// if true, need to indirect before calling.ifmtype.ArgType.Kind()==reflect.Ptr{argv=reflect.New(mtype.ArgType.Elem())}else{argv=reflect.New(mtype.ArgType)argIsValue=true}// argv guaranteed to be a pointer now.iferr=codec.ReadRequestBody(argv.Interface());err!=nil{return}ifargIsValue{argv=argv.Elem()}replyv=reflect.New(mtype.ReplyType.Elem())switchmtype.ReplyType.Elem().Kind(){casereflect.Map:replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem()))casereflect.Slice:replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(),0,0))}return}// 根据请求获取 mtype,req (mtype是method的反射信息)func(server*Server)readRequestHeader(codecServerCodec)(svc*service,mtype*methodType,req*Request,keepReadingbool,errerror){// Grab the request header.req=server.getRequest()err=codec.ReadRequestHeader(req)...// We read the header successfully. If we see an error now,// we can still recover and move on to the next request.keepReading=true// 请求传来的 ServiceMethod 是 serviceName.MethodNamedot:=strings.LastIndex(req.ServiceMethod,".")...serviceName:=req.ServiceMethod[:dot]methodName:=req.ServiceMethod[dot+1:]// Look up the request.svci,ok:=server.serviceMap.Load(serviceName)...svc=svci.(*service)mtype=svc.method[methodName]ifmtype==nil{err=errors.New("rpc: can't find method "+req.ServiceMethod)}return}// 根据获取的method和req,反射执行方法func(s*service)call(server*Server,sending*sync.Mutex,wg*sync.WaitGroup,mtype*methodType,req*Request,argv,replyvreflect.Value,codecServerCodec){ifwg!=nil{deferwg.Done()}mtype.Lock()mtype.numCalls++mtype.Unlock()function:=mtype.method.Func// Invoke the method, providing a new value for the reply.returnValues:=function.Call([]reflect.Value{s.rcvr,argv,replyv})// The return value for the method is an error.errInter:=returnValues[0].Interface()errmsg:=""iferrInter!=nil{errmsg=errInter.(error).Error()}server.sendResponse(sending,req,replyv.Interface(),codec,errmsg)server.freeRequest(req)}