mirror of
https://github.com/openfaas/faasd.git
synced 2025-06-20 04:56:37 +00:00
Initial
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
.gitignoreGopkg.lockGopkg.toml
cmd
main.gopkg
vendor
github.com
Microsoft
go-winio
.gitignoreLICENSEREADME.mdbackup.goea.gofile.gofileinfo.gogo.modgo.sumhvsock.gopipe.goprivilege.goreparse.gosd.gosyscall.gozsyscall_windows.go
archive
tar
pkg
guid
hcsshim
.gitignore.gometalinter.jsonLICENSEProtobuild.tomlREADME.mdappveyor.ymllayer.gomksyscall_windows.go
cmd
container.goerrors.gofunctional_tests.ps1go.modgo.sumhcsshim.gohnsendpoint.gohnsglobals.gohnsnetwork.gohnspolicy.gohnspolicylist.gohnssupport.gointerface.gointernal
cow
hcs
hcserror
hns
hns.gohnsendpoint.gohnsfuncs.gohnsglobals.gohnsnetwork.gohnspolicy.gohnspolicylist.gohnssupport.gonamespace.gozsyscall_windows.go
interop
log
logfields
longpath
mergemaps
oc
safefile
schema1
schema2
attachment.gobattery.gocache_query_stats_response.gochipset.goclose_handle.gocom_port.gocompute_system.goconfiguration.goconsole_size.gocontainer.gocontainer_credential_guard_state.gocontainer_memory_information.godevice.godevices.goenhanced_mode_video.goflexible_io_device.goguest_connection.goguest_connection_info.goguest_crash_reporting.goguest_os.goguest_state.gohosted_system.gohv_socket.gohv_socket_2.gohv_socket_service_config.gohv_socket_system_config.gokeyboard.golayer.golinux_kernel_direct.gomapped_directory.gomapped_pipe.gomemory.gomemory_2.gomemory_information_for_vm.gomemory_stats.gomodify_setting_request.gomouse.gonetwork_adapter.gonetworking.gopause_notification.gopause_options.goplan9.goplan9_share.goprocess_details.goprocess_modify_request.goprocess_parameters.goprocess_status.goprocessor.goprocessor_2.goprocessor_stats.goproperties.goproperty_query.gordp_connection_options.goregistry_changes.goregistry_key.goregistry_value.gorestore_state.gosave_options.goscsi.goshared_memory_configuration.goshared_memory_region.goshared_memory_region_info.gosilo_properties.gostatistics.gostorage.gostorage_qo_s.gostorage_stats.gotopology.gouefi.gouefi_boot_entry.goversion.govideo_monitor.govirtual_machine.govirtual_node_info.govirtual_p_mem_controller.govirtual_p_mem_device.govirtual_smb.govirtual_smb_share.govirtual_smb_share_options.govm_memory.gowindows_crash_reporting.go
timeout
vmcompute
wclayer
activatelayer.gobaselayer.gocreatelayer.gocreatescratchlayer.godeactivatelayer.godestroylayer.goexpandscratchsize.goexportlayer.gogetlayermountpath.gogetsharedbaseimages.gograntvmaccess.goimportlayer.golayerexists.golayerid.golayerutils.golegacy.gonametoguid.gopreparelayer.goprocessimage.gounpreparelayer.gowclayer.gozsyscall_windows.go
pkg
process.gozsyscall_windows.gocontainerd
containerd
.appveyor.yml.gitignore.golangci.yml.mailmap.travis.yml.zuul.yamlADOPTERS.mdBUILDING.mdLICENSEMakefileMakefile.darwinMakefile.freebsdMakefile.linuxMakefile.windowsNOTICEPLUGINS.mdProtobuild.tomlREADME.mdRELEASES.mdROADMAP.mdRUNC.mdSCOPE.mdservices.gosignals.gosignals_unix.gosignals_windows.go
api
services
containers
content
diff
events
images
introspection
leases
namespaces
snapshots
tasks
version
types
archive
compression
strconv.gotar.gotar_opts.gotar_opts_linux.gotar_opts_windows.gotar_unix.gotar_windows.gotime.gotime_unix.gotime_windows.gocio
client.goclient_opts.gocode-of-conduct.mdcontainer.gocontainer_checkpoint_opts.gocontainer_opts.gocontainer_opts_unix.gocontainer_restore_opts.gocontainerd.servicecontainers
containerstore.gocontent
defaults
diff.godiff
errdefs
events.goevents
export.gofilters
grpc.goidentifiers
image.goimage_store.goimages
import.goinstall.goinstall_opts.golabels
lease.goleases
log
mount
lookup_unix.golookup_unsupported.gomount.gomount_linux.gomount_unix.gomount_windows.gomountinfo.gomountinfo_bsd.gomountinfo_linux.gomountinfo_unsupported.gotemp.gotemp_unix.gotemp_unsupported.go
namespaces.gonamespaces
oci
pkg
platforms
plugin
process.gopull.goreference
remotes
docker
auth.goauthorizer.goconverter.gofetcher.gohandler.gohttpreadseeker.gopusher.goregistry.goresolver.go
handlers.goresolver.goschema1
scope.gostatus.gorootfs
runtime
linux
v2
runc
snapshots
snapshotter_default_linux.gosnapshotter_default_unix.gosnapshotter_default_windows.gosys
env.goepoll.gofds.gofilesys_unix.gofilesys_windows.gomount_linux.gooom_unix.gooom_windows.goproc.goreaper.goreaper_linux.gosocket_unix.gosocket_windows.gostat_bsd.gostat_unix.gosubprocess_unsafe_linux.gosubprocess_unsafe_linux.s
task.gotask_opts.gotask_opts_unix.gounpacker.govendor.confversion
continuity
AUTHORSLICENSE
fs
copy.gocopy_linux.gocopy_unix.gocopy_windows.godiff.godiff_unix.godiff_windows.godtype_linux.godu.godu_unix.godu_windows.gohardlink.gohardlink_unix.gohardlink_windows.gopath.gostat_bsd.gostat_linux.gotime.go
syscallx
sysx
fifo
.gitignore.travis.ymlLICENSEMakefileerrors.gofifo.gohandle_linux.gohandle_nolinux.gomkfifo_nosolaris.gomkfifo_solaris.goraw.goreadme.md
ttrpc
.gitignore.travis.ymlLICENSEREADME.mdchannel.goclient.gocodec.goconfig.gohandshake.gointerceptor.gometadata.goserver.goservices.gotypes.gounixcreds_linux.go
typeurl
docker
gogo
googleapis
protobuf
AUTHORSCONTRIBUTORSGOLANG_CONTRIBUTORSLICENSE
proto
Makefileclone.gocustom_gogo.godecode.godeprecated.godiscard.goduration.goduration_gogo.goencode.goencode_gogo.goequal.goextensions.goextensions_gogo.golib.golib_gogo.gomessage_set.gopointer_reflect.gopointer_reflect_gogo.gopointer_unsafe.gopointer_unsafe_gogo.goproperties.goproperties_gogo.goskip_gogo.gotable_marshal.gotable_marshal_gogo.gotable_merge.gotable_unmarshal.gotable_unmarshal_gogo.gotext.gotext_gogo.gotext_parser.gotimestamp.gotimestamp_gogo.gowrappers.gowrappers_gogo.go
sortkeys
types
golang
groupcache
protobuf
inconshreveable
konsorten
go-windows-terminal-sequences
morikuni
opencontainers
go-digest
.mailmap.pullapprove.yml.travis.ymlCONTRIBUTING.mdLICENSELICENSE.docsMAINTAINERSREADME.mdalgorithm.godigest.godigester.godoc.goverifiers.go
image-spec
runc
runtime-spec
pkg
sirupsen
logrus
.gitignore.travis.ymlCHANGELOG.mdLICENSEREADME.mdalt_exit.goappveyor.ymldoc.goentry.goexported.goformatter.gogo.modgo.sumhooks.gojson_formatter.gologger.gologrus.goterminal_check_appengine.goterminal_check_bsd.goterminal_check_js.goterminal_check_notappengine.goterminal_check_unix.goterminal_check_windows.goterminal_notwindows.goterminal_windows.gotext_formatter.gowriter.go
spf13
cobra
.gitignore.mailmap.travis.ymlLICENSE.txtREADME.mdargs.gobash_completions.gobash_completions.mdcobra.gocommand.gocommand_notwin.gocommand_win.gogo.modgo.sumpowershell_completions.gopowershell_completions.mdshell_completions.gozsh_completions.gozsh_completions.md
cobra
cmd
testdata
pflag
.gitignore.travis.ymlLICENSEREADME.mdbool.gobool_slice.gobytes.gocount.goduration.goduration_slice.goflag.gofloat32.gofloat32_slice.gofloat64.gofloat64_slice.gogo.modgo.sumgolangflag.goint.goint16.goint32.goint32_slice.goint64.goint64_slice.goint8.goint_slice.goip.goip_slice.goipmask.goipnet.gostring.gostring_array.gostring_slice.gostring_to_int.gostring_to_int64.gostring_to_string.gouint.gouint16.gouint32.gouint64.gouint8.gouint_slice.go
syndtr
go.opencensus.io
.gitignore.travis.ymlAUTHORSCONTRIBUTING.mdLICENSEMakefileREADME.mdappveyor.ymlgo.modgo.sum
internal
opencensus.gotrace
golang.org
x
net
AUTHORSCONTRIBUTORSLICENSEPATENTS
context
http
httpguts
http2
.gitignoreDockerfileMakefileREADMEciphers.goclient_conn_pool.godatabuffer.goerrors.goflow.goframe.gogo111.gogotrack.goheadermap.go
hpack
http2.gonot_go111.gopipe.goserver.gotransport.gowrite.gowritesched.gowritesched_priority.gowritesched_random.goidna
idna10.0.0.goidna9.0.0.gopunycode.gotables10.0.0.gotables11.0.0.gotables12.00.gotables9.0.0.gotrie.gotrieval.go
internal
timeseries
trace
sync
sys
AUTHORSCONTRIBUTORSLICENSEPATENTS
unix
.gitignoreREADME.mdaffinity_linux.goaliases.goasm_aix_ppc64.sasm_darwin_386.sasm_darwin_amd64.sasm_darwin_arm.sasm_darwin_arm64.sasm_dragonfly_amd64.sasm_freebsd_386.sasm_freebsd_amd64.sasm_freebsd_arm.sasm_freebsd_arm64.sasm_linux_386.sasm_linux_amd64.sasm_linux_arm.sasm_linux_arm64.sasm_linux_mips64x.sasm_linux_mipsx.sasm_linux_ppc64x.sasm_linux_riscv64.sasm_linux_s390x.sasm_netbsd_386.sasm_netbsd_amd64.sasm_netbsd_arm.sasm_netbsd_arm64.sasm_openbsd_386.sasm_openbsd_amd64.sasm_openbsd_arm.sasm_openbsd_arm64.sasm_solaris_amd64.sbluetooth_linux.gocap_freebsd.goconstants.godev_aix_ppc.godev_aix_ppc64.godev_darwin.godev_dragonfly.godev_freebsd.godev_linux.godev_netbsd.godev_openbsd.godirent.goendian_big.goendian_little.goenv_unix.goerrors_freebsd_386.goerrors_freebsd_amd64.goerrors_freebsd_arm.gofcntl.gofcntl_darwin.gofcntl_linux_32bit.gofdset.gogccgo.gogccgo_c.cgccgo_linux_amd64.goioctl.gomkall.shmkasm_darwin.gomkerrors.shmkpost.gomksyscall.gomksyscall_aix_ppc.gomksyscall_aix_ppc64.gomksyscall_solaris.gomksysctl_openbsd.gomksysnum.gopagesize_unix.gopledge_openbsd.gorace.gorace0.goreaddirent_getdents.goreaddirent_getdirentries.gosockcmsg_dragonfly.gosockcmsg_linux.gosockcmsg_unix.gosockcmsg_unix_other.gostr.gosyscall.gosyscall_aix.gosyscall_aix_ppc.gosyscall_aix_ppc64.gosyscall_bsd.gosyscall_darwin.1_12.gosyscall_darwin.1_13.gosyscall_darwin.gosyscall_darwin_386.1_11.gosyscall_darwin_386.gosyscall_darwin_amd64.1_11.gosyscall_darwin_amd64.gosyscall_darwin_arm.1_11.gosyscall_darwin_arm.gosyscall_darwin_arm64.1_11.gosyscall_darwin_arm64.gosyscall_darwin_libSystem.gosyscall_dragonfly.gosyscall_dragonfly_amd64.gosyscall_freebsd.gosyscall_freebsd_386.gosyscall_freebsd_amd64.gosyscall_freebsd_arm.gosyscall_freebsd_arm64.gosyscall_linux.gosyscall_linux_386.gosyscall_linux_amd64.gosyscall_linux_amd64_gc.gosyscall_linux_arm.gosyscall_linux_arm64.gosyscall_linux_gc.gosyscall_linux_gc_386.gosyscall_linux_gccgo_386.gosyscall_linux_gccgo_arm.gosyscall_linux_mips64x.gosyscall_linux_mipsx.gosyscall_linux_ppc64x.gosyscall_linux_riscv64.gosyscall_linux_s390x.gosyscall_linux_sparc64.gosyscall_netbsd.gosyscall_netbsd_386.gosyscall_netbsd_amd64.gosyscall_netbsd_arm.gosyscall_netbsd_arm64.gosyscall_openbsd.gosyscall_openbsd_386.gosyscall_openbsd_amd64.gosyscall_openbsd_arm.gosyscall_openbsd_arm64.gosyscall_solaris.gosyscall_solaris_amd64.gosyscall_unix.gosyscall_unix_gc.gosyscall_unix_gc_ppc64x.gotimestruct.gotypes_aix.gotypes_darwin.gotypes_dragonfly.gotypes_freebsd.gotypes_netbsd.gotypes_openbsd.gotypes_solaris.gounveil_openbsd.goxattr_bsd.gozerrors_aix_ppc.gozerrors_aix_ppc64.gozerrors_darwin_386.gozerrors_darwin_amd64.gozerrors_darwin_arm.gozerrors_darwin_arm64.gozerrors_dragonfly_amd64.gozerrors_freebsd_386.gozerrors_freebsd_amd64.gozerrors_freebsd_arm.gozerrors_freebsd_arm64.gozerrors_linux_386.gozerrors_linux_amd64.gozerrors_linux_arm.gozerrors_linux_arm64.gozerrors_linux_mips.gozerrors_linux_mips64.gozerrors_linux_mips64le.gozerrors_linux_mipsle.gozerrors_linux_ppc64.gozerrors_linux_ppc64le.gozerrors_linux_riscv64.gozerrors_linux_s390x.gozerrors_linux_sparc64.gozerrors_netbsd_386.gozerrors_netbsd_amd64.gozerrors_netbsd_arm.gozerrors_netbsd_arm64.gozerrors_openbsd_386.gozerrors_openbsd_amd64.gozerrors_openbsd_arm.gozerrors_openbsd_arm64.gozerrors_solaris_amd64.gozptrace_armnn_linux.gozptrace_linux_arm64.gozptrace_mipsnn_linux.gozptrace_mipsnnle_linux.gozptrace_x86_linux.gozsyscall_aix_ppc.gozsyscall_aix_ppc64.gozsyscall_aix_ppc64_gc.gozsyscall_aix_ppc64_gccgo.gozsyscall_darwin_386.1_11.gozsyscall_darwin_386.1_13.gozsyscall_darwin_386.1_13.szsyscall_darwin_386.gozsyscall_darwin_386.szsyscall_darwin_amd64.1_11.gozsyscall_darwin_amd64.1_13.gozsyscall_darwin_amd64.1_13.szsyscall_darwin_amd64.gozsyscall_darwin_amd64.szsyscall_darwin_arm.1_11.gozsyscall_darwin_arm.1_13.gozsyscall_darwin_arm.1_13.szsyscall_darwin_arm.gozsyscall_darwin_arm.szsyscall_darwin_arm64.1_11.gozsyscall_darwin_arm64.1_13.gozsyscall_darwin_arm64.1_13.szsyscall_darwin_arm64.gozsyscall_darwin_arm64.szsyscall_dragonfly_amd64.gozsyscall_freebsd_386.gozsyscall_freebsd_amd64.gozsyscall_freebsd_arm.gozsyscall_freebsd_arm64.gozsyscall_linux_386.gozsyscall_linux_amd64.gozsyscall_linux_arm.gozsyscall_linux_arm64.gozsyscall_linux_mips.gozsyscall_linux_mips64.gozsyscall_linux_mips64le.gozsyscall_linux_mipsle.gozsyscall_linux_ppc64.gozsyscall_linux_ppc64le.gozsyscall_linux_riscv64.gozsyscall_linux_s390x.gozsyscall_linux_sparc64.gozsyscall_netbsd_386.gozsyscall_netbsd_amd64.gozsyscall_netbsd_arm.gozsyscall_netbsd_arm64.gozsyscall_openbsd_386.gozsyscall_openbsd_amd64.gozsyscall_openbsd_arm.gozsyscall_openbsd_arm64.gozsyscall_solaris_amd64.gozsysctl_openbsd_386.gozsysctl_openbsd_amd64.gozsysctl_openbsd_arm.gozsysctl_openbsd_arm64.gozsysnum_darwin_386.gozsysnum_darwin_amd64.gozsysnum_darwin_arm.gozsysnum_darwin_arm64.gozsysnum_dragonfly_amd64.gozsysnum_freebsd_386.gozsysnum_freebsd_amd64.gozsysnum_freebsd_arm.gozsysnum_freebsd_arm64.gozsysnum_linux_386.gozsysnum_linux_amd64.gozsysnum_linux_arm.gozsysnum_linux_arm64.gozsysnum_linux_mips.gozsysnum_linux_mips64.gozsysnum_linux_mips64le.gozsysnum_linux_mipsle.gozsysnum_linux_ppc64.gozsysnum_linux_ppc64le.gozsysnum_linux_riscv64.gozsysnum_linux_s390x.gozsysnum_linux_sparc64.gozsysnum_netbsd_386.gozsysnum_netbsd_amd64.gozsysnum_netbsd_arm.gozsysnum_netbsd_arm64.gozsysnum_openbsd_386.gozsysnum_openbsd_amd64.gozsysnum_openbsd_arm.gozsysnum_openbsd_arm64.goztypes_aix_ppc.goztypes_aix_ppc64.goztypes_darwin_386.goztypes_darwin_amd64.goztypes_darwin_arm.goztypes_darwin_arm64.goztypes_dragonfly_amd64.goztypes_freebsd_386.goztypes_freebsd_amd64.goztypes_freebsd_arm.goztypes_freebsd_arm64.goztypes_linux_386.goztypes_linux_amd64.goztypes_linux_arm.goztypes_linux_arm64.goztypes_linux_mips.goztypes_linux_mips64.goztypes_linux_mips64le.goztypes_linux_mipsle.goztypes_linux_ppc64.goztypes_linux_ppc64le.goztypes_linux_riscv64.goztypes_linux_s390x.goztypes_linux_sparc64.goztypes_netbsd_386.goztypes_netbsd_amd64.goztypes_netbsd_arm.goztypes_netbsd_arm64.goztypes_openbsd_386.goztypes_openbsd_amd64.goztypes_openbsd_arm.goztypes_openbsd_arm64.goztypes_solaris_amd64.go
windows
aliases.godll_windows.goempty.senv_windows.goeventlog.goexec_windows.gomemory_windows.gomkerrors.bashmkknownfolderids.bashmksyscall.gorace.gorace0.gosecurity_windows.goservice.gostr.gosyscall.gosyscall_windows.gotypes_windows.gotypes_windows_386.gotypes_windows_amd64.gotypes_windows_arm.gozerrors_windows.gozknownfolderids_windows.gozsyscall_windows.go
text
AUTHORSCONTRIBUTORSLICENSEPATENTS
collate
internal
colltab
gen
language
common.gocompact.go
compact
compose.gocoverage.gogen.gogen_common.golanguage.golookup.gomatch.goparse.gotables.gotags.gotag
triegen
ucd
language
secure
transform
unicode
google.golang.org
genproto
grpc
.travis.ymlAUTHORSCONTRIBUTING.mdLICENSEMakefileREADME.mdbackoff.gobalancer.gocall.goclientconn.gocodec.gocodegen.shinstall_gae.shinterceptor.go
balancer
balancer_conn_wrappers.gobalancer_v1_wrapper.gobinarylog
grpc_binarylog_v1
codes
connectivity
credentials
dialoptions.godoc.goencoding
go.modgo.sumgrpclog
health
grpc_health_v1
internal
keepalive
metadata
naming
peer
picker_wrapper.gopickfirst.gopreloader.goproxy.goresolver
resolver_conn_wrapper.gorpc_util.goserver.goservice_config.goserviceconfig
stats
status
stream.gotap
trace.goversion.govet.sh
350
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
Normal file
350
vendor/github.com/containerd/ttrpc/client.go
generated
vendored
Normal file
@ -0,0 +1,350 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ErrClosed is returned by client methods when the underlying connection is
|
||||
// closed.
|
||||
var ErrClosed = errors.New("ttrpc: closed")
|
||||
|
||||
// Client for a ttrpc server
|
||||
type Client struct {
|
||||
codec codec
|
||||
conn net.Conn
|
||||
channel *channel
|
||||
calls chan *callRequest
|
||||
|
||||
ctx context.Context
|
||||
closed func()
|
||||
|
||||
closeOnce sync.Once
|
||||
userCloseFunc func()
|
||||
|
||||
errOnce sync.Once
|
||||
err error
|
||||
interceptor UnaryClientInterceptor
|
||||
}
|
||||
|
||||
// ClientOpts configures a client
|
||||
type ClientOpts func(c *Client)
|
||||
|
||||
// WithOnClose sets the close func whenever the client's Close() method is called
|
||||
func WithOnClose(onClose func()) ClientOpts {
|
||||
return func(c *Client) {
|
||||
c.userCloseFunc = onClose
|
||||
}
|
||||
}
|
||||
|
||||
// WithUnaryClientInterceptor sets the provided client interceptor
|
||||
func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
|
||||
return func(c *Client) {
|
||||
c.interceptor = i
|
||||
}
|
||||
}
|
||||
|
||||
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &Client{
|
||||
codec: codec{},
|
||||
conn: conn,
|
||||
channel: newChannel(conn),
|
||||
calls: make(chan *callRequest),
|
||||
closed: cancel,
|
||||
ctx: ctx,
|
||||
userCloseFunc: func() {},
|
||||
interceptor: defaultClientInterceptor,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
}
|
||||
|
||||
go c.run()
|
||||
return c
|
||||
}
|
||||
|
||||
type callRequest struct {
|
||||
ctx context.Context
|
||||
req *Request
|
||||
resp *Response // response will be written back here
|
||||
errs chan error // error written here on completion
|
||||
}
|
||||
|
||||
func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error {
|
||||
payload, err := c.codec.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
creq = &Request{
|
||||
Service: service,
|
||||
Method: method,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
cresp = &Response{}
|
||||
)
|
||||
|
||||
if metadata, ok := GetMetadata(ctx); ok {
|
||||
metadata.setRequest(creq)
|
||||
}
|
||||
|
||||
if dl, ok := ctx.Deadline(); ok {
|
||||
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
|
||||
}
|
||||
|
||||
info := &UnaryClientInfo{
|
||||
FullMethod: fullPath(service, method),
|
||||
}
|
||||
if err := c.interceptor(ctx, creq, cresp, info, c.dispatch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.codec.Unmarshal(cresp.Payload, resp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cresp.Status != nil && cresp.Status.Code != int32(codes.OK) {
|
||||
return status.ErrorProto(cresp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
|
||||
errs := make(chan error, 1)
|
||||
call := &callRequest{
|
||||
ctx: ctx,
|
||||
req: req,
|
||||
resp: resp,
|
||||
errs: errs,
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case c.calls <- call:
|
||||
case <-c.ctx.Done():
|
||||
return c.error()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errs:
|
||||
return filterCloseErr(err)
|
||||
case <-c.ctx.Done():
|
||||
return c.error()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
c.closeOnce.Do(func() {
|
||||
c.closed()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
type message struct {
|
||||
messageHeader
|
||||
p []byte
|
||||
err error
|
||||
}
|
||||
|
||||
type receiver struct {
|
||||
wg *sync.WaitGroup
|
||||
messages chan *message
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *receiver) run(ctx context.Context, c *channel) {
|
||||
defer r.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
r.err = ctx.Err()
|
||||
return
|
||||
default:
|
||||
mh, p, err := c.recv()
|
||||
if err != nil {
|
||||
_, ok := status.FromError(err)
|
||||
if !ok {
|
||||
// treat all errors that are not an rpc status as terminal.
|
||||
// all others poison the connection.
|
||||
r.err = filterCloseErr(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
select {
|
||||
case r.messages <- &message{
|
||||
messageHeader: mh,
|
||||
p: p[:mh.Length],
|
||||
err: err,
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
r.err = ctx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) run() {
|
||||
var (
|
||||
streamID uint32 = 1
|
||||
waiters = make(map[uint32]*callRequest)
|
||||
calls = c.calls
|
||||
incoming = make(chan *message)
|
||||
receiversDone = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
// broadcast the shutdown error to the remaining waiters.
|
||||
abortWaiters := func(wErr error) {
|
||||
for _, waiter := range waiters {
|
||||
waiter.errs <- wErr
|
||||
}
|
||||
}
|
||||
recv := &receiver{
|
||||
wg: &wg,
|
||||
messages: incoming,
|
||||
}
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(receiversDone)
|
||||
}()
|
||||
go recv.run(c.ctx, c.channel)
|
||||
|
||||
defer func() {
|
||||
c.conn.Close()
|
||||
c.userCloseFunc()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case call := <-calls:
|
||||
if err := c.send(streamID, messageTypeRequest, call.req); err != nil {
|
||||
call.errs <- err
|
||||
continue
|
||||
}
|
||||
|
||||
waiters[streamID] = call
|
||||
streamID += 2 // enforce odd client initiated request ids
|
||||
case msg := <-incoming:
|
||||
call, ok := waiters[msg.StreamID]
|
||||
if !ok {
|
||||
logrus.Errorf("ttrpc: received message for unknown channel %v", msg.StreamID)
|
||||
continue
|
||||
}
|
||||
|
||||
call.errs <- c.recv(call.resp, msg)
|
||||
delete(waiters, msg.StreamID)
|
||||
case <-receiversDone:
|
||||
// all the receivers have exited
|
||||
if recv.err != nil {
|
||||
c.setError(recv.err)
|
||||
}
|
||||
// don't return out, let the close of the context trigger the abort of waiters
|
||||
c.Close()
|
||||
case <-c.ctx.Done():
|
||||
abortWaiters(c.error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) error() error {
|
||||
c.errOnce.Do(func() {
|
||||
if c.err == nil {
|
||||
c.err = ErrClosed
|
||||
}
|
||||
})
|
||||
return c.err
|
||||
}
|
||||
|
||||
func (c *Client) setError(err error) {
|
||||
c.errOnce.Do(func() {
|
||||
c.err = err
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) send(streamID uint32, mtype messageType, msg interface{}) error {
|
||||
p, err := c.codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.channel.send(streamID, mtype, p)
|
||||
}
|
||||
|
||||
func (c *Client) recv(resp *Response, msg *message) error {
|
||||
if msg.err != nil {
|
||||
return msg.err
|
||||
}
|
||||
|
||||
if msg.Type != messageTypeResponse {
|
||||
return errors.New("unknown message type received")
|
||||
}
|
||||
|
||||
defer c.channel.putmbuf(msg.p)
|
||||
return proto.Unmarshal(msg.p, resp)
|
||||
}
|
||||
|
||||
// filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when
|
||||
// returning from call or handling errors from main read loop.
|
||||
//
|
||||
// This purposely ignores errors with a wrapped cause.
|
||||
func filterCloseErr(err error) error {
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case err == io.EOF:
|
||||
return ErrClosed
|
||||
case errors.Cause(err) == io.EOF:
|
||||
return ErrClosed
|
||||
case strings.Contains(err.Error(), "use of closed network connection"):
|
||||
return ErrClosed
|
||||
default:
|
||||
// if we have an epipe on a write, we cast to errclosed
|
||||
if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" {
|
||||
if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE {
|
||||
return ErrClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user