Remove armhf/armv7 support from faasd

See notes in: #364

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis (OpenFaaS Ltd)
2024-06-12 10:19:41 +01:00
parent 7dbaeef3d8
commit e4848cd829
426 changed files with 20188 additions and 20620 deletions

View File

@ -1,7 +1,5 @@
linters:
enable:
- structcheck
- varcheck
- staticcheck
- unconvert
- gofmt

View File

@ -151,7 +151,7 @@ install-protobuild:
coverage: ## generate coverprofiles from the unit tests, except tests that require root
@echo "$(WHALE) $@"
@rm -f coverage.txt
@$(GO) test -i ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null
@$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null
@( for pkg in ${PACKAGES}; do \
$(GO) test ${TESTFLAGS} \
-cover \

View File

@ -71,6 +71,42 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
}
}
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
return func(c *Client) {
if len(interceptors) == 0 {
return
}
if c.interceptor != nil {
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
req *Request,
reply *Response,
info *UnaryClientInfo,
final Invoker,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
}
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
if len(interceptors) == 0 {
return final
}
return func(
ctx context.Context,
req *Request,
reply *Response,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
// NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
@ -85,13 +121,16 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx: ctx,
userCloseFunc: func() {},
userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
}
for _, o := range opts {
o(c)
}
if c.interceptor == nil {
c.interceptor = defaultClientInterceptor
}
go c.run()
return c
}
@ -214,60 +253,66 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.remoteClosed {
return io.EOF
}
var msg *streamMessage
select {
case <-cs.ctx.Done():
return cs.ctx.Err()
case msg, ok := <-cs.s.recv:
if !ok {
case <-cs.s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-cs.s.recv:
default:
return cs.s.recvErr
}
case msg = <-cs.s.recv:
}
if msg.header.Type == messageTypeResponse {
resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
if msg.header.Type == messageTypeResponse {
resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err
}
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err
}
if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status)
}
if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status)
}
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
}
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// Close closes the ttrpc connection and underlying connection
@ -280,7 +325,7 @@ func (c *Client) Close() error {
return nil
}
// UserOnCloseWait is used to blocks untils the user's on-close callback
// UserOnCloseWait is used to block until the user's on-close callback
// finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error {
select {
@ -341,25 +386,44 @@ func (c *Client) receiveLoop() error {
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
c.streamLock.Lock()
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
// This use of sendLock could be split into another mutex that covers stream creation + first send,
// and just use sendLock to guard writing to the wire, but for now it seems simpler to have fewer mutexes.
c.sendLock.Lock()
defer c.sendLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}
// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
var s *stream
if err := func() error {
// In the future this could be replaced with a sync.Map instead of streamLock+map.
c.streamLock.Lock()
defer c.streamLock.Unlock()
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
return ErrClosed
default:
}
s = newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
return nil
}(); err != nil {
return nil, err
}
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)
@ -477,25 +541,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
}
defer c.deleteStream(s)
var msg *streamMessage
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ctx.Done():
return ErrClosed
case msg, ok := <-s.recv:
if !ok {
case <-s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-s.recv:
default:
return s.recvErr
}
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
case msg = <-s.recv:
}
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
}

View File

@ -16,7 +16,10 @@
package ttrpc
import "errors"
import (
"context"
"errors"
)
type serverConfig struct {
handshaker Handshaker
@ -44,9 +47,40 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if c.interceptor != nil {
return errors.New("only one interceptor allowed per server")
return errors.New("only one unchained interceptor allowed per server")
}
c.interceptor = i
return nil
}
}
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if len(interceptors) == 0 {
return nil
}
if c.interceptor != nil {
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
unmarshal Unmarshaler,
info *UnaryServerInfo,
method Method) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
return nil
}
}
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
if len(interceptors) == 0 {
return method
}
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
}

View File

@ -547,7 +547,7 @@ func (c *serverConn) run(sctx context.Context) {
// branch. Basically, it means that we are no longer receiving
// requests due to a terminal error.
recvErr = nil // connection is now "closing"
if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) {
// The client went away and we should stop processing
// requests, so that the client connection is closed
return

View File

@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true)
}()
if req.Payload != nil {
// Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}

View File

@ -35,27 +35,26 @@ type stream struct {
closeOnce sync.Once
recvErr error
recvClose chan struct{}
}
func newStream(id streamID, send sender) *stream {
return &stream{
id: id,
sender: send,
recv: make(chan *streamMessage, 1),
id: id,
sender: send,
recv: make(chan *streamMessage, 1),
recvClose: make(chan struct{}),
}
}
func (s *stream) closeWithError(err error) error {
s.closeOnce.Do(func() {
if s.recv != nil {
close(s.recv)
if err != nil {
s.recvErr = err
} else {
s.recvErr = ErrClosed
}
if err != nil {
s.recvErr = err
} else {
s.recvErr = ErrClosed
}
close(s.recvClose)
})
return nil
}
@ -65,10 +64,14 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error {
}
func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
if s.recvErr != nil {
select {
case <-s.recvClose:
return s.recvErr
default:
}
select {
case <-s.recvClose:
return s.recvErr
case s.recv <- msg:
return nil
case <-ctx.Done():