Merge master into breakout_swarm

Signed-off-by: Alex Ellis <alexellis2@gmail.com>
This commit is contained in:
Alex Ellis
2018-02-01 09:25:39 +00:00
parent afeb7bbce4
commit f954bf0733
1953 changed files with 614131 additions and 175582 deletions

View File

@ -0,0 +1,214 @@
package notifications
import (
"net/http"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/uuid"
)
type bridge struct {
ub URLBuilder
actor ActorRecord
source SourceRecord
request RequestRecord
sink Sink
}
var _ Listener = &bridge{}
// URLBuilder defines a subset of url builder to be used by the event listener.
type URLBuilder interface {
BuildManifestURL(name reference.Named) (string, error)
BuildBlobURL(ref reference.Canonical) (string, error)
}
// NewBridge returns a notification listener that writes records to sink,
// using the actor and source. Any urls populated in the events created by
// this bridge will be created using the URLBuilder.
// TODO(stevvooe): Update this to simply take a context.Context object.
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink Sink) Listener {
return &bridge{
ub: ub,
actor: actor,
source: source,
request: request,
sink: sink,
}
}
// NewRequestRecord builds a RequestRecord for use in NewBridge from an
// http.Request, associating it with a request id.
func NewRequestRecord(id string, r *http.Request) RequestRecord {
return RequestRecord{
ID: id,
Addr: context.RemoteAddr(r),
Host: r.Host,
Method: r.Method,
UserAgent: r.UserAgent(),
}
}
func (b *bridge) ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
manifestEvent, err := b.createManifestEvent(EventActionPush, repo, sm)
if err != nil {
return err
}
for _, option := range options {
if opt, ok := option.(distribution.WithTagOption); ok {
manifestEvent.Target.Tag = opt.Tag
break
}
}
return b.sink.Write(*manifestEvent)
}
func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
manifestEvent, err := b.createManifestEvent(EventActionPull, repo, sm)
if err != nil {
return err
}
for _, option := range options {
if opt, ok := option.(distribution.WithTagOption); ok {
manifestEvent.Target.Tag = opt.Tag
break
}
}
return b.sink.Write(*manifestEvent)
}
func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {
return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst)
}
func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionPush, repo, desc)
}
func (b *bridge) BlobPulled(repo reference.Named, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionPull, repo, desc)
}
func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error {
event, err := b.createBlobEvent(EventActionMount, repo, desc)
if err != nil {
return err
}
event.Target.FromRepository = fromRepo.Name()
return b.sink.Write(*event)
}
func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
}
func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error {
manifestEvent, err := b.createManifestEvent(action, repo, sm)
if err != nil {
return err
}
return b.sink.Write(*manifestEvent)
}
func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
event := b.createEvent(action)
event.Target.Repository = repo.Name()
event.Target.Digest = dgst
return b.sink.Write(*event)
}
func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
event := b.createEvent(action)
event.Target.Repository = repo.Name()
mt, p, err := sm.Payload()
if err != nil {
return nil, err
}
// Ensure we have the canonical manifest descriptor here
_, desc, err := distribution.UnmarshalManifest(mt, p)
if err != nil {
return nil, err
}
event.Target.MediaType = mt
event.Target.Length = desc.Size
event.Target.Size = desc.Size
event.Target.Digest = desc.Digest
ref, err := reference.WithDigest(repo, event.Target.Digest)
if err != nil {
return nil, err
}
event.Target.URL, err = b.ub.BuildManifestURL(ref)
if err != nil {
return nil, err
}
return event, nil
}
func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
event := b.createEvent(action)
event.Target.Digest = dgst
event.Target.Repository = repo.Name()
return b.sink.Write(*event)
}
func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
event, err := b.createBlobEvent(action, repo, desc)
if err != nil {
return err
}
return b.sink.Write(*event)
}
func (b *bridge) createBlobEvent(action string, repo reference.Named, desc distribution.Descriptor) (*Event, error) {
event := b.createEvent(action)
event.Target.Descriptor = desc
event.Target.Length = desc.Size
event.Target.Repository = repo.Name()
ref, err := reference.WithDigest(repo, desc.Digest)
if err != nil {
return nil, err
}
event.Target.URL, err = b.ub.BuildBlobURL(ref)
if err != nil {
return nil, err
}
return event, nil
}
// createEvent creates an event with actor and source populated.
func (b *bridge) createEvent(action string) *Event {
event := createEvent(action)
event.Source = b.source
event.Actor = b.actor
event.Request = b.request
return event
}
// createEvent returns a new event, timestamped, with the specified action.
func createEvent(action string) *Event {
return &Event{
ID: uuid.Generate().String(),
Timestamp: time.Now(),
Action: action,
}
}

View File

@ -0,0 +1,222 @@
package notifications
import (
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/uuid"
"github.com/docker/libtrust"
)
var (
// common environment for expected manifest events.
repo = "test/repo"
source = SourceRecord{
Addr: "remote.test",
InstanceID: uuid.Generate().String(),
}
ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/", false))
actor = ActorRecord{
Name: "test",
}
request = RequestRecord{}
m = schema1.Manifest{
Name: repo,
Tag: "latest",
}
sm *schema1.SignedManifest
payload []byte
dgst digest.Digest
)
func TestEventBridgeManifestPulled(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPulled(repoRef, sm); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestPushed(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPushed(repoRef, sm); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestPushedWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
}
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPushed(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestPulledWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
}
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPulled(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkDeleted(t, EventActionDelete, events...)
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestDeleted(repoRef, dgst); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func createTestEnv(t *testing.T, fn testSinkFn) Listener {
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("error generating private key: %v", err)
}
sm, err = schema1.Sign(&m, pk)
if err != nil {
t.Fatalf("error signing manifest: %v", err)
}
payload = sm.Canonical
dgst = digest.FromBytes(payload)
return NewBridge(ub, source, actor, request, fn)
}
func checkDeleted(t *testing.T, action string, events ...Event) {
if len(events) != 1 {
t.Fatalf("unexpected number of events: %v != 1", len(events))
}
event := events[0]
if event.Source != source {
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
}
if event.Request != request {
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
}
if event.Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
}
if event.Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
}
if event.Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
}
}
func checkCommonManifest(t *testing.T, action string, events ...Event) {
checkCommon(t, events...)
event := events[0]
if event.Action != action {
t.Fatalf("unexpected event action: %q != %q", event.Action, action)
}
repoRef, _ := reference.ParseNamed(repo)
ref, _ := reference.WithDigest(repoRef, dgst)
u, err := ub.BuildManifestURL(ref)
if err != nil {
t.Fatalf("error building expected url: %v", err)
}
if event.Target.URL != u {
t.Fatalf("incorrect url passed: \n%q != \n%q", event.Target.URL, u)
}
}
func checkCommon(t *testing.T, events ...Event) {
if len(events) != 1 {
t.Fatalf("unexpected number of events: %v != 1", len(events))
}
event := events[0]
if event.Source != source {
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
}
if event.Request != request {
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
}
if event.Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
}
if event.Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
}
if event.Target.Length != int64(len(payload)) {
t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload))
}
if event.Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
}
}
type testSinkFn func(events ...Event) error
func (tsf testSinkFn) Write(events ...Event) error {
return tsf(events...)
}
func (tsf testSinkFn) Close() error { return nil }
func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder {
if err != nil {
panic(err)
}
return ub
}

View File

@ -0,0 +1,93 @@
package notifications
import (
"net/http"
"time"
)
// EndpointConfig covers the optional configuration parameters for an active
// endpoint.
type EndpointConfig struct {
Headers http.Header
Timeout time.Duration
Threshold int
Backoff time.Duration
IgnoredMediaTypes []string
Transport *http.Transport
}
// defaults set any zero-valued fields to a reasonable default.
func (ec *EndpointConfig) defaults() {
if ec.Timeout <= 0 {
ec.Timeout = time.Second
}
if ec.Threshold <= 0 {
ec.Threshold = 10
}
if ec.Backoff <= 0 {
ec.Backoff = time.Second
}
if ec.Transport == nil {
ec.Transport = http.DefaultTransport.(*http.Transport)
}
}
// Endpoint is a reliable, queued, thread-safe sink that notify external http
// services when events are written. Writes are non-blocking and always
// succeed for callers but events may be queued internally.
type Endpoint struct {
Sink
url string
name string
EndpointConfig
metrics *safeMetrics
}
// NewEndpoint returns a running endpoint, ready to receive events.
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
var endpoint Endpoint
endpoint.name = name
endpoint.url = url
endpoint.EndpointConfig = config
endpoint.defaults()
endpoint.metrics = newSafeMetrics()
// Configures the inmemory queue, retry, http pipeline.
endpoint.Sink = newHTTPSink(
endpoint.url, endpoint.Timeout, endpoint.Headers,
endpoint.Transport, endpoint.metrics.httpStatusListener())
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes)
register(&endpoint)
return &endpoint
}
// Name returns the name of the endpoint, generally used for debugging.
func (e *Endpoint) Name() string {
return e.name
}
// URL returns the url of the endpoint.
func (e *Endpoint) URL() string {
return e.url
}
// ReadMetrics populates em with metrics from the endpoint.
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
e.metrics.Lock()
defer e.metrics.Unlock()
*em = e.metrics.EndpointMetrics
// Map still need to copied in a threadsafe manner.
em.Statuses = make(map[string]int)
for k, v := range e.metrics.Statuses {
em.Statuses[k] = v
}
}

View File

@ -0,0 +1,160 @@
package notifications
import (
"fmt"
"time"
"github.com/docker/distribution"
)
// EventAction constants used in action field of Event.
const (
EventActionPull = "pull"
EventActionPush = "push"
EventActionMount = "mount"
EventActionDelete = "delete"
)
const (
// EventsMediaType is the mediatype for the json event envelope. If the
// Event, ActorRecord, SourceRecord or Envelope structs change, the version
// number should be incremented.
EventsMediaType = "application/vnd.docker.distribution.events.v1+json"
// LayerMediaType is the media type for image rootfs diffs (aka "layers")
// used by Docker. We don't expect this to change for quite a while.
layerMediaType = "application/vnd.docker.container.image.rootfs.diff+x-gtar"
)
// Envelope defines the fields of a json event envelope message that can hold
// one or more events.
type Envelope struct {
// Events make up the contents of the envelope. Events present in a single
// envelope are not necessarily related.
Events []Event `json:"events,omitempty"`
}
// TODO(stevvooe): The event type should be separate from the json format. It
// should be defined as an interface. Leaving as is for now since we don't
// need that at this time. If we make this change, the struct below would be
// called "EventRecord".
// Event provides the fields required to describe a registry event.
type Event struct {
// ID provides a unique identifier for the event.
ID string `json:"id,omitempty"`
// Timestamp is the time at which the event occurred.
Timestamp time.Time `json:"timestamp,omitempty"`
// Action indicates what action encompasses the provided event.
Action string `json:"action,omitempty"`
// Target uniquely describes the target of the event.
Target struct {
// TODO(stevvooe): Use http.DetectContentType for layers, maybe.
distribution.Descriptor
// Length in bytes of content. Same as Size field in Descriptor.
// Provided for backwards compatibility.
Length int64 `json:"length,omitempty"`
// Repository identifies the named repository.
Repository string `json:"repository,omitempty"`
// FromRepository identifies the named repository which a blob was mounted
// from if appropriate.
FromRepository string `json:"fromRepository,omitempty"`
// URL provides a direct link to the content.
URL string `json:"url,omitempty"`
// Tag provides the tag
Tag string `json:"tag,omitempty"`
} `json:"target,omitempty"`
// Request covers the request that generated the event.
Request RequestRecord `json:"request,omitempty"`
// Actor specifies the agent that initiated the event. For most
// situations, this could be from the authorizaton context of the request.
Actor ActorRecord `json:"actor,omitempty"`
// Source identifies the registry node that generated the event. Put
// differently, while the actor "initiates" the event, the source
// "generates" it.
Source SourceRecord `json:"source,omitempty"`
}
// ActorRecord specifies the agent that initiated the event. For most
// situations, this could be from the authorizaton context of the request.
// Data in this record can refer to both the initiating client and the
// generating request.
type ActorRecord struct {
// Name corresponds to the subject or username associated with the
// request context that generated the event.
Name string `json:"name,omitempty"`
// TODO(stevvooe): Look into setting a session cookie to get this
// without docker daemon.
// SessionID
// TODO(stevvooe): Push the "Docker-Command" header to replace cookie and
// get the actual command.
// Command
}
// RequestRecord covers the request that generated the event.
type RequestRecord struct {
// ID uniquely identifies the request that initiated the event.
ID string `json:"id"`
// Addr contains the ip or hostname and possibly port of the client
// connection that initiated the event. This is the RemoteAddr from
// the standard http request.
Addr string `json:"addr,omitempty"`
// Host is the externally accessible host name of the registry instance,
// as specified by the http host header on incoming requests.
Host string `json:"host,omitempty"`
// Method has the request method that generated the event.
Method string `json:"method"`
// UserAgent contains the user agent header of the request.
UserAgent string `json:"useragent"`
}
// SourceRecord identifies the registry node that generated the event. Put
// differently, while the actor "initiates" the event, the source "generates"
// it.
type SourceRecord struct {
// Addr contains the ip or hostname and the port of the registry node
// that generated the event. Generally, this will be resolved by
// os.Hostname() along with the running port.
Addr string `json:"addr,omitempty"`
// InstanceID identifies a running instance of an application. Changes
// after each restart.
InstanceID string `json:"instanceID,omitempty"`
}
var (
// ErrSinkClosed is returned if a write is issued to a sink that has been
// closed. If encountered, the error should be considered terminal and
// retries will not be successful.
ErrSinkClosed = fmt.Errorf("sink: closed")
)
// Sink accepts and sends events.
type Sink interface {
// Write writes one or more events to the sink. If no error is returned,
// the caller will assume that all events have been committed and will not
// try to send them again. If an error is received, the caller may retry
// sending the event. The caller should cede the slice of memory to the
// sink and not modify it after calling this method.
Write(events ...Event) error
// Close the sink, possibly waiting for pending events to flush.
Close() error
}

View File

@ -0,0 +1,157 @@
package notifications
import (
"encoding/json"
"strings"
"testing"
"time"
"github.com/docker/distribution/manifest/schema1"
)
// TestEventJSONFormat provides silly test to detect if the event format or
// envelope has changed. If this code fails, the revision of the protocol may
// need to be incremented.
func TestEventEnvelopeJSONFormat(t *testing.T) {
var expected = strings.TrimSpace(`
{
"events": [
{
"id": "asdf-asdf-asdf-asdf-0",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"mediaType": "application/vnd.docker.distribution.manifest.v1+prettyjws",
"size": 1,
"digest": "sha256:0123456789abcdef0",
"length": 1,
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
},
{
"id": "asdf-asdf-asdf-asdf-1",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
"size": 2,
"digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5",
"length": 2,
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
},
{
"id": "asdf-asdf-asdf-asdf-2",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"target": {
"mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
"size": 3,
"digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d6",
"length": 3,
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
},
"source": {
"addr": "hostname.local:port"
}
}
]
}
`)
tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5])
if err != nil {
t.Fatalf("error creating time: %v", err)
}
var prototype Event
prototype.Action = EventActionPush
prototype.Timestamp = tm
prototype.Actor.Name = "test-actor"
prototype.Request.ID = "asdfasdf"
prototype.Request.Addr = "client.local"
prototype.Request.Host = "registrycluster.local"
prototype.Request.Method = "PUT"
prototype.Request.UserAgent = "test/0.1"
prototype.Source.Addr = "hostname.local:port"
var manifestPush Event
manifestPush = prototype
manifestPush.ID = "asdf-asdf-asdf-asdf-0"
manifestPush.Target.Digest = "sha256:0123456789abcdef0"
manifestPush.Target.Length = 1
manifestPush.Target.Size = 1
manifestPush.Target.MediaType = schema1.MediaTypeSignedManifest
manifestPush.Target.Repository = "library/test"
manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var layerPush0 Event
layerPush0 = prototype
layerPush0.ID = "asdf-asdf-asdf-asdf-1"
layerPush0.Target.Digest = "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5"
layerPush0.Target.Length = 2
layerPush0.Target.Size = 2
layerPush0.Target.MediaType = layerMediaType
layerPush0.Target.Repository = "library/test"
layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var layerPush1 Event
layerPush1 = prototype
layerPush1.ID = "asdf-asdf-asdf-asdf-2"
layerPush1.Target.Digest = "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d6"
layerPush1.Target.Length = 3
layerPush1.Target.Size = 3
layerPush1.Target.MediaType = layerMediaType
layerPush1.Target.Repository = "library/test"
layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"
var envelope Envelope
envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1)
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
t.Fatalf("unexpected error marshaling envelope: %v", err)
}
if string(p) != expected {
t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected)
}
}

View File

@ -0,0 +1,150 @@
package notifications
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// httpSink implements a single-flight, http notification endpoint. This is
// very lightweight in that it only makes an attempt at an http request.
// Reliability should be provided by the caller.
type httpSink struct {
url string
mu sync.Mutex
closed bool
client *http.Client
listeners []httpStatusListener
// TODO(stevvooe): Allow one to configure the media type accepted by this
// sink and choose the serialization based on that.
}
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
if transport == nil {
transport = http.DefaultTransport.(*http.Transport)
}
return &httpSink{
url: u,
listeners: listeners,
client: &http.Client{
Transport: &headerRoundTripper{
Transport: transport,
headers: headers,
},
Timeout: timeout,
},
}
}
// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
success(status int, events ...Event)
failure(status int, events ...Event)
err(err error, events ...Event)
}
// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(events ...Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
if hs.closed {
return ErrSinkClosed
}
envelope := Envelope{
Events: events,
}
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
// retry but we are going to do it to keep the code simple. It is likely
// we could change the event struct to manage its own buffer.
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
}
body := bytes.NewReader(p)
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error posting: %v", hs, err)
}
defer resp.Body.Close()
// The notifier will treat any 2xx or 3xx response as accepted by the
// endpoint.
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400:
for _, listener := range hs.listeners {
listener.success(resp.StatusCode, events...)
}
// TODO(stevvooe): This is a little accepting: we may want to support
// unsupported media type responses with retries using the correct
// media type. There may also be cases that will never work.
return nil
default:
for _, listener := range hs.listeners {
listener.failure(resp.StatusCode, events...)
}
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
}
}
// Close the endpoint
func (hs *httpSink) Close() error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return fmt.Errorf("httpsink: already closed")
}
hs.closed = true
return nil
}
func (hs *httpSink) String() string {
return fmt.Sprintf("httpSink{%s}", hs.url)
}
type headerRoundTripper struct {
*http.Transport // must be transport to support CancelRequest
headers http.Header
}
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var nreq http.Request
nreq = *req
nreq.Header = make(http.Header)
merge := func(headers http.Header) {
for k, v := range headers {
nreq.Header[k] = append(nreq.Header[k], v...)
}
}
merge(req.Header)
merge(hrt.headers)
return hrt.Transport.RoundTrip(&nreq)
}

View File

@ -0,0 +1,185 @@
package notifications
import (
"crypto/tls"
"encoding/json"
"fmt"
"mime"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"
"github.com/docker/distribution/manifest/schema1"
)
// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
// conditions, ensuring correct behavior.
func TestHTTPSink(t *testing.T) {
serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
t.Fatalf("unexpected request method: %v", r.Method)
return
}
// Extract the content type and make sure it matches
contentType := r.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType)
return
}
if mediaType != EventsMediaType {
w.WriteHeader(http.StatusUnsupportedMediaType)
t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType)
return
}
var envelope Envelope
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&envelope); err != nil {
w.WriteHeader(http.StatusBadRequest)
t.Fatalf("error decoding request body: %v", err)
return
}
// Let caller choose the status
status, err := strconv.Atoi(r.FormValue("status"))
if err != nil {
t.Logf("error parsing status: %v", err)
// May just be empty, set status to 200
status = http.StatusOK
}
w.WriteHeader(status)
})
server := httptest.NewTLSServer(serverHandler)
metrics := newSafeMetrics()
sink := newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
// first make sure that the default transport gives x509 untrusted cert error
events := []Event{}
err := sink.Write(events...)
if !strings.Contains(err.Error(), "x509") {
t.Fatal("TLS server with default transport should give unknown CA error")
}
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing http sink: %v", err)
}
// make sure that passing in the transport no longer gives this error
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
sink = newHTTPSink(server.URL, 0, nil, tr,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
err = sink.Write(events...)
if err != nil {
t.Fatalf("unexpected error writing events: %v", err)
}
// reset server to standard http server and sink to a basic sink
server = httptest.NewServer(serverHandler)
sink = newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
var expectedMetrics EndpointMetrics
expectedMetrics.Statuses = make(map[string]int)
for _, tc := range []struct {
events []Event // events to send
url string
failure bool // true if there should be a failure.
statusCode int // if not set, no status code should be incremented.
}{
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest)},
},
{
statusCode: http.StatusOK,
events: []Event{
createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
createTestEvent("push", "library/test", layerMediaType),
createTestEvent("push", "library/test", layerMediaType),
},
},
{
statusCode: http.StatusTemporaryRedirect,
},
{
statusCode: http.StatusBadRequest,
failure: true,
},
{
// Case where connection never goes through.
url: "http://shoudlntresolve/",
failure: true,
},
} {
if tc.failure {
expectedMetrics.Failures += len(tc.events)
} else {
expectedMetrics.Successes += len(tc.events)
}
if tc.statusCode > 0 {
expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events)
}
url := tc.url
if url == "" {
url = server.URL + "/"
}
// setup endpoint to respond with expected status code.
url += fmt.Sprintf("?status=%v", tc.statusCode)
sink.url = url
t.Logf("testcase: %v, fail=%v", url, tc.failure)
// Try a simple event emission.
err := sink.Write(tc.events...)
if !tc.failure {
if err != nil {
t.Fatalf("unexpected error send event: %v", err)
}
} else {
if err == nil {
t.Fatalf("the endpoint should have rejected the request")
}
}
if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {
t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics)
}
}
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing http sink: %v", err)
}
// double close returns error
if err := sink.Close(); err == nil {
t.Fatalf("second close should have returned error: %v", err)
}
}
func createTestEvent(action, repo, typ string) Event {
event := createEvent(action)
event.Target.MediaType = typ
event.Target.Repository = repo
return *event
}

View File

@ -0,0 +1,215 @@
package notifications
import (
"net/http"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
)
// ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface {
ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
ManifestDeleted(repo reference.Named, dgst digest.Digest) error
}
// BlobListener describes a listener that can respond to layer related events.
type BlobListener interface {
BlobPushed(repo reference.Named, desc distribution.Descriptor) error
BlobPulled(repo reference.Named, desc distribution.Descriptor) error
BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
BlobDeleted(repo reference.Named, desc digest.Digest) error
}
// Listener combines all repository events into a single interface.
type Listener interface {
ManifestListener
BlobListener
}
type repositoryListener struct {
distribution.Repository
listener Listener
}
// Listen dispatches events on the repository to the listener.
func Listen(repo distribution.Repository, listener Listener) distribution.Repository {
return &repositoryListener{
Repository: repo,
listener: listener,
}
}
func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
manifests, err := rl.Repository.Manifests(ctx, options...)
if err != nil {
return nil, err
}
return &manifestServiceListener{
ManifestService: manifests,
parent: rl,
}, nil
}
func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
return &blobServiceListener{
BlobStore: rl.Repository.Blobs(ctx),
parent: rl,
}
}
type manifestServiceListener struct {
distribution.ManifestService
parent *repositoryListener
}
func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
err := msl.ManifestService.Delete(ctx, dgst)
if err == nil {
if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
context.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err)
}
}
return err
}
func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
sm, err := msl.ManifestService.Get(ctx, dgst, options...)
if err == nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
context.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err)
}
}
return sm, err
}
func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
dgst, err := msl.ManifestService.Put(ctx, sm, options...)
if err == nil {
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
context.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err)
}
}
return dgst, err
}
type blobServiceListener struct {
distribution.BlobStore
parent *repositoryListener
}
var _ distribution.BlobStore = &blobServiceListener{}
func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
p, err := bsl.BlobStore.Get(ctx, dgst)
if err == nil {
if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
}
}
}
return p, err
}
func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
rc, err := bsl.BlobStore.Open(ctx, dgst)
if err == nil {
if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
}
}
}
return rc, err
}
func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
if err == nil {
if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
}
}
}
return err
}
func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
if err == nil {
if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err)
}
}
return desc, err
}
func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
wr, err := bsl.BlobStore.Create(ctx, options...)
switch err := err.(type) {
case distribution.ErrBlobMounted:
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
context.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
}
return nil, err
}
return bsl.decorateWriter(wr), err
}
func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
err := bsl.BlobStore.Delete(ctx, dgst)
if err == nil {
if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
}
}
return err
}
func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
wr, err := bsl.BlobStore.Resume(ctx, id)
return bsl.decorateWriter(wr), err
}
func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
return &blobWriterListener{
BlobWriter: wr,
parent: bsl,
}
}
type blobWriterListener struct {
distribution.BlobWriter
parent *blobServiceListener
}
func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
committed, err := bwl.BlobWriter.Commit(ctx, desc)
if err == nil {
if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil {
context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
}
}
return committed, err
}

View File

@ -0,0 +1,205 @@
package notifications
import (
"io"
"reflect"
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/cache/memory"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/docker/libtrust"
)
func TestListener(t *testing.T) {
ctx := context.Background()
k, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatal(err)
}
registry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableDelete, storage.EnableRedirect, storage.Schema1SigningKey(k))
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
tl := &testListener{
ops: make(map[string]int),
}
repoRef, _ := reference.ParseNamed("foo/bar")
repository, err := registry.Repository(ctx, repoRef)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
repository = Listen(repository, tl)
// Now take the registry through a number of operations
checkExerciseRepository(t, repository)
expectedOps := map[string]int{
"manifest:push": 1,
"manifest:pull": 1,
"manifest:delete": 1,
"layer:push": 2,
"layer:pull": 2,
"layer:delete": 2,
}
if !reflect.DeepEqual(tl.ops, expectedOps) {
t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
}
}
type testListener struct {
ops map[string]int
}
func (tl *testListener) ManifestPushed(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
tl.ops["manifest:push"]++
return nil
}
func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
tl.ops["manifest:pull"]++
return nil
}
func (tl *testListener) ManifestDeleted(repo reference.Named, d digest.Digest) error {
tl.ops["manifest:delete"]++
return nil
}
func (tl *testListener) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
tl.ops["layer:push"]++
return nil
}
func (tl *testListener) BlobPulled(repo reference.Named, desc distribution.Descriptor) error {
tl.ops["layer:pull"]++
return nil
}
func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error {
tl.ops["layer:mount"]++
return nil
}
func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error {
tl.ops["layer:delete"]++
return nil
}
// checkExerciseRegistry takes the registry through all of its operations,
// carrying out generic checks.
func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
// TODO(stevvooe): This would be a nice testutil function. Basically, it
// takes the registry through a common set of operations. This could be
// used to make cross-cutting updates by changing internals that affect
// update counts. Basically, it would make writing tests a lot easier.
ctx := context.Background()
tag := "thetag"
// todo: change this to use Builder
m := schema1.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: repository.Named().Name(),
Tag: tag,
}
var blobDigests []digest.Digest
blobs := repository.Blobs(ctx)
for i := 0; i < 2; i++ {
rs, ds, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating test layer: %v", err)
}
dgst := digest.Digest(ds)
blobDigests = append(blobDigests, dgst)
wr, err := blobs.Create(ctx)
if err != nil {
t.Fatalf("error creating layer upload: %v", err)
}
// Use the resumes, as well!
wr, err = blobs.Resume(ctx, wr.ID())
if err != nil {
t.Fatalf("error resuming layer upload: %v", err)
}
io.Copy(wr, rs)
if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
m.FSLayers = append(m.FSLayers, schema1.FSLayer{
BlobSum: dgst,
})
m.History = append(m.History, schema1.History{
V1Compatibility: "",
})
// Then fetch the blobs
if rc, err := blobs.Open(ctx, dgst); err != nil {
t.Fatalf("error fetching layer: %v", err)
} else {
defer rc.Close()
}
}
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("unexpected error generating key: %v", err)
}
sm, err := schema1.Sign(&m, pk)
if err != nil {
t.Fatalf("unexpected error signing manifest: %v", err)
}
manifests, err := repository.Manifests(ctx)
if err != nil {
t.Fatal(err.Error())
}
var digestPut digest.Digest
if digestPut, err = manifests.Put(ctx, sm); err != nil {
t.Fatalf("unexpected error putting the manifest: %v", err)
}
dgst := digest.FromBytes(sm.Canonical)
if dgst != digestPut {
t.Fatalf("mismatching digest from payload and put")
}
_, err = manifests.Get(ctx, dgst)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
err = manifests.Delete(ctx, dgst)
if err != nil {
t.Fatalf("unexpected error deleting blob: %v", err)
}
for _, d := range blobDigests {
err = blobs.Delete(ctx, d)
if err != nil {
t.Fatalf("unexpected error deleting blob: %v", err)
}
}
}

View File

@ -0,0 +1,152 @@
package notifications
import (
"expvar"
"fmt"
"net/http"
"sync"
)
// EndpointMetrics track various actions taken by the endpoint, typically by
// number of events. The goal of this to export it via expvar but we may find
// some other future solution to be better.
type EndpointMetrics struct {
Pending int // events pending in queue
Events int // total events incoming
Successes int // total events written successfully
Failures int // total events failed
Errors int // total events errored
Statuses map[string]int // status code histogram, per call event
}
// safeMetrics guards the metrics implementation with a lock and provides a
// safe update function.
type safeMetrics struct {
EndpointMetrics
sync.Mutex // protects statuses map
}
// newSafeMetrics returns safeMetrics with map allocated.
func newSafeMetrics() *safeMetrics {
var sm safeMetrics
sm.Statuses = make(map[string]int)
return &sm
}
// httpStatusListener returns the listener for the http sink that updates the
// relevant counters.
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
return &endpointMetricsHTTPStatusListener{
safeMetrics: sm,
}
}
// eventQueueListener returns a listener that maintains queue related counters.
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
return &endpointMetricsEventQueueListener{
safeMetrics: sm,
}
}
// endpointMetricsHTTPStatusListener increments counters related to http sinks
// for the relevant events.
type endpointMetricsHTTPStatusListener struct {
*safeMetrics
}
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Successes += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Failures += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Errors += len(events)
}
// endpointMetricsEventQueueListener maintains the incoming events counter and
// the queues pending count.
type endpointMetricsEventQueueListener struct {
*safeMetrics
}
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Events += len(events)
eqc.Pending += len(events)
}
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Pending -= len(events)
}
// endpoints is global registry of endpoints used to report metrics to expvar
var endpoints struct {
registered []*Endpoint
mu sync.Mutex
}
// register places the endpoint into expvar so that stats are tracked.
func register(e *Endpoint) {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
endpoints.registered = append(endpoints.registered, e)
}
func init() {
// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
// Ideally, we do more metrics through logging but we need some nice
// realtime metrics for queue state for now.
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
var notifications expvar.Map
notifications.Init()
notifications.Set("endpoints", expvar.Func(func() interface{} {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
var names []interface{}
for _, v := range endpoints.registered {
var epjson struct {
Name string `json:"name"`
URL string `json:"url"`
EndpointConfig
Metrics EndpointMetrics
}
epjson.Name = v.Name()
epjson.URL = v.URL()
epjson.EndpointConfig = v.EndpointConfig
v.ReadMetrics(&epjson.Metrics)
names = append(names, epjson)
}
return names
}))
registry.(*expvar.Map).Set("notifications", &notifications)
}

View File

@ -0,0 +1,375 @@
package notifications
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// NOTE(stevvooe): This file contains definitions for several utility sinks.
// Typically, the broadcaster is the only sink that should be required
// externally, but others are suitable for export if the need arises. Albeit,
// the tight integration with endpoint metrics should be removed.
// Broadcaster sends events to multiple, reliable Sinks. The goal of this
// component is to dispatch events to configured endpoints. Reliability can be
// provided by wrapping incoming sinks.
type Broadcaster struct {
sinks []Sink
events chan []Event
closed chan chan struct{}
}
// NewBroadcaster ...
// Add appends one or more sinks to the list of sinks. The broadcaster
// behavior will be affected by the properties of the sink. Generally, the
// sink should accept all messages and deal with reliability on its own. Use
// of EventQueue and RetryingSink should be used here.
func NewBroadcaster(sinks ...Sink) *Broadcaster {
b := Broadcaster{
sinks: sinks,
events: make(chan []Event),
closed: make(chan chan struct{}),
}
// Start the broadcaster
go b.run()
return &b
}
// Write accepts a block of events to be dispatched to all sinks. This method
// will never fail and should never block (hopefully!). The caller cedes the
// slice memory to the broadcaster and should not modify it after calling
// write.
func (b *Broadcaster) Write(events ...Event) error {
select {
case b.events <- events:
case <-b.closed:
return ErrSinkClosed
}
return nil
}
// Close the broadcaster, ensuring that all messages are flushed to the
// underlying sink before returning.
func (b *Broadcaster) Close() error {
logrus.Infof("broadcaster: closing")
select {
case <-b.closed:
// already closed
return fmt.Errorf("broadcaster: already closed")
default:
// do a little chan handoff dance to synchronize closing
closed := make(chan struct{})
b.closed <- closed
close(b.closed)
<-closed
return nil
}
}
// run is the main broadcast loop, started when the broadcaster is created.
// Under normal conditions, it waits for events on the event channel. After
// Close is called, this goroutine will exit.
func (b *Broadcaster) run() {
for {
select {
case block := <-b.events:
for _, sink := range b.sinks {
if err := sink.Write(block...); err != nil {
logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
}
}
case closing := <-b.closed:
// close all the underlying sinks
for _, sink := range b.sinks {
if err := sink.Close(); err != nil {
logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
}
}
closing <- struct{}{}
logrus.Debugf("broadcaster: closed")
return
}
}
}
// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
sink Sink
events *list.List
listeners []eventQueueListener
cond *sync.Cond
mu sync.Mutex
closed bool
}
// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
ingress(events ...Event)
egress(events ...Event)
}
// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
eq := eventQueue{
sink: sink,
events: list.New(),
listeners: listeners,
}
eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq
}
// Write accepts the events into the queue, only failing if the queue has
// beend closed.
func (eq *eventQueue) Write(events ...Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return ErrSinkClosed
}
for _, listener := range eq.listeners {
listener.ingress(events...)
}
eq.events.PushBack(events)
eq.cond.Signal() // signal waiters
return nil
}
// Close shutsdown the event queue, flushing
func (eq *eventQueue) Close() error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return fmt.Errorf("eventqueue: already closed")
}
// set closed flag
eq.closed = true
eq.cond.Signal() // signal flushes queue
eq.cond.Wait() // wait for signal from last flush
return eq.sink.Close()
}
// run is the main goroutine to flush events to the target sink.
func (eq *eventQueue) run() {
for {
block := eq.next()
if block == nil {
return // nil block means event queue is closed.
}
if err := eq.sink.Write(block...); err != nil {
logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
}
for _, listener := range eq.listeners {
listener.egress(block...)
}
}
}
// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *eventQueue) next() []Event {
eq.mu.Lock()
defer eq.mu.Unlock()
for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}
eq.cond.Wait()
}
front := eq.events.Front()
block := front.Value.([]Event)
eq.events.Remove(front)
return block
}
// ignoredMediaTypesSink discards events with ignored target media types and
// passes the rest along.
type ignoredMediaTypesSink struct {
Sink
ignored map[string]bool
}
func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink {
if len(ignored) == 0 {
return sink
}
ignoredMap := make(map[string]bool)
for _, mediaType := range ignored {
ignoredMap[mediaType] = true
}
return &ignoredMediaTypesSink{
Sink: sink,
ignored: ignoredMap,
}
}
// Write discards events with ignored target media types and passes the rest
// along.
func (imts *ignoredMediaTypesSink) Write(events ...Event) error {
var kept []Event
for _, e := range events {
if !imts.ignored[e.Target.MediaType] {
kept = append(kept, e)
}
}
if len(kept) == 0 {
return nil
}
return imts.Sink.Write(kept...)
}
// retryingSink retries the write until success or an ErrSinkClosed is
// returned. Underlying sink must have p > 0 of succeeding or the sink will
// block. Internally, it is a circuit breaker retries to manage reset.
// Concurrent calls to a retrying sink are serialized through the sink,
// meaning that if one is in-flight, another will not proceed.
type retryingSink struct {
mu sync.Mutex
sink Sink
closed bool
// circuit breaker heuristics
failures struct {
threshold int
recent int
last time.Time
backoff time.Duration // time after which we retry after failure.
}
}
type retryingSinkListener interface {
active(events ...Event)
retry(events ...Event)
}
// TODO(stevvooe): We are using circuit break here, which actually doesn't
// make a whole lot of sense for this use case, since we always retry. Move
// this to use bounded exponential backoff.
// newRetryingSink returns a sink that will retry writes to a sink, backing
// off on failure. Parameters threshold and backoff adjust the behavior of the
// circuit breaker.
func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
rs := &retryingSink{
sink: sink,
}
rs.failures.threshold = threshold
rs.failures.backoff = backoff
return rs
}
// Write attempts to flush the events to the downstream sink until it succeeds
// or the sink is closed.
func (rs *retryingSink) Write(events ...Event) error {
rs.mu.Lock()
defer rs.mu.Unlock()
retry:
if rs.closed {
return ErrSinkClosed
}
if !rs.proceed() {
logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
rs.wait(rs.failures.backoff)
goto retry
}
if err := rs.write(events...); err != nil {
if err == ErrSinkClosed {
// terminal!
return err
}
logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
goto retry
}
return nil
}
// Close closes the sink and the underlying sink.
func (rs *retryingSink) Close() error {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.closed {
return fmt.Errorf("retryingsink: already closed")
}
rs.closed = true
return rs.sink.Close()
}
// write provides a helper that dispatches failure and success properly. Used
// by write as the single-flight write call.
func (rs *retryingSink) write(events ...Event) error {
if err := rs.sink.Write(events...); err != nil {
rs.failure()
return err
}
rs.reset()
return nil
}
// wait backoff time against the sink, unlocking so others can proceed. Should
// only be called by methods that currently have the mutex.
func (rs *retryingSink) wait(backoff time.Duration) {
rs.mu.Unlock()
defer rs.mu.Lock()
// backoff here
time.Sleep(backoff)
}
// reset marks a successful call.
func (rs *retryingSink) reset() {
rs.failures.recent = 0
rs.failures.last = time.Time{}
}
// failure records a failure.
func (rs *retryingSink) failure() {
rs.failures.recent++
rs.failures.last = time.Now().UTC()
}
// proceed returns true if the call should proceed based on circuit breaker
// heuristics.
func (rs *retryingSink) proceed() bool {
return rs.failures.recent < rs.failures.threshold ||
time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
}

View File

@ -0,0 +1,256 @@
package notifications
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
"github.com/Sirupsen/logrus"
"testing"
)
func TestBroadcaster(t *testing.T) {
const nEvents = 1000
var sinks []Sink
for i := 0; i < 10; i++ {
sinks = append(sinks, &testSink{})
}
b := NewBroadcaster(sinks...)
var block []Event
var wg sync.WaitGroup
for i := 1; i <= nEvents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := b.Write(block...); err != nil {
t.Fatalf("error writing block of length %d: %v", len(block), err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait() // Wait until writes complete
checkClose(t, b)
// Iterate through the sinks and check that they all have the expected length.
for _, sink := range sinks {
ts := sink.(*testSink)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != nEvents {
t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
}
}
func TestEventQueue(t *testing.T) {
const nevents = 1000
var ts testSink
metrics := newSafeMetrics()
eq := newEventQueue(
// delayed sync simulates destination slower than channel comms
&delayedSink{
Sink: &ts,
delay: time.Millisecond * 1,
}, metrics.eventQueueListener())
var wg sync.WaitGroup
var block []Event
for i := 1; i <= nevents; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
if err := eq.Write(block...); err != nil {
t.Fatalf("error writing event block: %v", err)
}
wg.Done()
}(block...)
block = nil
}
}
wg.Wait()
checkClose(t, eq)
ts.mu.Lock()
defer ts.mu.Unlock()
metrics.Lock()
defer metrics.Unlock()
if len(ts.events) != nevents {
t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
}
if !ts.closed {
t.Fatalf("sink should have been closed")
}
if metrics.Events != nevents {
t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
}
if metrics.Pending != 0 {
t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
}
}
func TestIgnoredMediaTypesSink(t *testing.T) {
blob := createTestEvent("push", "library/test", "blob")
manifest := createTestEvent("push", "library/test", "manifest")
type testcase struct {
ignored []string
expected []Event
}
cases := []testcase{
{nil, []Event{blob, manifest}},
{[]string{"other"}, []Event{blob, manifest}},
{[]string{"blob"}, []Event{manifest}},
{[]string{"blob", "manifest"}, nil},
}
for _, c := range cases {
ts := &testSink{}
s := newIgnoredMediaTypesSink(ts, c.ignored)
if err := s.Write(blob, manifest); err != nil {
t.Fatalf("error writing event: %v", err)
}
ts.mu.Lock()
if !reflect.DeepEqual(ts.events, c.expected) {
t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
}
ts.mu.Unlock()
}
}
func TestRetryingSink(t *testing.T) {
// Make a sync that fails most of the time, ensuring that all the events
// make it through.
var ts testSink
flaky := &flakySink{
rate: 1.0, // start out always failing.
Sink: &ts,
}
s := newRetryingSink(flaky, 3, 10*time.Millisecond)
var wg sync.WaitGroup
var block []Event
for i := 1; i <= 100; i++ {
block = append(block, createTestEvent("push", "library/test", "blob"))
// Above 50, set the failure rate lower
if i > 50 {
s.mu.Lock()
flaky.rate = 0.90
s.mu.Unlock()
}
if i%10 == 0 && i > 0 {
wg.Add(1)
go func(block ...Event) {
defer wg.Done()
if err := s.Write(block...); err != nil {
t.Fatalf("error writing event block: %v", err)
}
}(block...)
block = nil
}
}
wg.Wait()
checkClose(t, s)
ts.mu.Lock()
defer ts.mu.Unlock()
if len(ts.events) != 100 {
t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
}
}
type testSink struct {
events []Event
mu sync.Mutex
closed bool
}
func (ts *testSink) Write(events ...Event) error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.events = append(ts.events, events...)
return nil
}
func (ts *testSink) Close() error {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.closed = true
logrus.Infof("closing testSink")
return nil
}
type delayedSink struct {
Sink
delay time.Duration
}
func (ds *delayedSink) Write(events ...Event) error {
time.Sleep(ds.delay)
return ds.Sink.Write(events...)
}
type flakySink struct {
Sink
rate float64
}
func (fs *flakySink) Write(events ...Event) error {
if rand.Float64() < fs.rate {
return fmt.Errorf("error writing %d events", len(events))
}
return fs.Sink.Write(events...)
}
func checkClose(t *testing.T, sink Sink) {
if err := sink.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}
// second close should not crash but should return an error.
if err := sink.Close(); err == nil {
t.Fatalf("no error on double close")
}
// Write after closed should be an error
if err := sink.Write([]Event{}...); err == nil {
t.Fatalf("write after closed did not have an error")
} else if err != ErrSinkClosed {
t.Fatalf("error should be ErrSinkClosed")
}
}