File notifications.obscpio of Package distribution-container

07070100000000000081a40000000000000000000000016328304800001911000000000000000000000000000000000000001800000000notifications/bridge.gopackage notifications

import (
	"net/http"
	"time"

	"github.com/distribution/distribution/v3"
	"github.com/distribution/distribution/v3/context"
	"github.com/distribution/distribution/v3/reference"
	"github.com/distribution/distribution/v3/uuid"
	events "github.com/docker/go-events"
	"github.com/opencontainers/go-digest"
)

type bridge struct {
	ub                URLBuilder
	includeReferences bool
	actor             ActorRecord
	source            SourceRecord
	request           RequestRecord
	sink              events.Sink
}

var _ Listener = &bridge{}

// URLBuilder defines a subset of url builder to be used by the event listener.
type URLBuilder interface {
	BuildManifestURL(name reference.Named) (string, error)
	BuildBlobURL(ref reference.Canonical) (string, error)
}

// NewBridge returns a notification listener that writes records to sink,
// using the actor and source. Any urls populated in the events created by
// this bridge will be created using the URLBuilder.
// TODO(stevvooe): Update this to simply take a context.Context object.
func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, request RequestRecord, sink events.Sink, includeReferences bool) Listener {
	return &bridge{
		ub:                ub,
		includeReferences: includeReferences,
		actor:             actor,
		source:            source,
		request:           request,
		sink:              sink,
	}
}

// NewRequestRecord builds a RequestRecord for use in NewBridge from an
// http.Request, associating it with a request id.
func NewRequestRecord(id string, r *http.Request) RequestRecord {
	return RequestRecord{
		ID:        id,
		Addr:      context.RemoteAddr(r),
		Host:      r.Host,
		Method:    r.Method,
		UserAgent: r.UserAgent(),
	}
}

func (b *bridge) ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
	manifestEvent, err := b.createManifestEvent(EventActionPush, repo, sm)
	if err != nil {
		return err
	}

	for _, option := range options {
		if opt, ok := option.(distribution.WithTagOption); ok {
			manifestEvent.Target.Tag = opt.Tag
			break
		}
	}
	return b.sink.Write(*manifestEvent)
}

func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
	manifestEvent, err := b.createManifestEvent(EventActionPull, repo, sm)
	if err != nil {
		return err
	}

	for _, option := range options {
		if opt, ok := option.(distribution.WithTagOption); ok {
			manifestEvent.Target.Tag = opt.Tag
			break
		}
	}
	return b.sink.Write(*manifestEvent)
}

func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {
	return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst)
}

func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
	return b.createBlobEventAndWrite(EventActionPush, repo, desc)
}

func (b *bridge) BlobPulled(repo reference.Named, desc distribution.Descriptor) error {
	return b.createBlobEventAndWrite(EventActionPull, repo, desc)
}

func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error {
	event, err := b.createBlobEvent(EventActionMount, repo, desc)
	if err != nil {
		return err
	}
	event.Target.FromRepository = fromRepo.Name()
	return b.sink.Write(*event)
}

func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error {
	return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst)
}

func (b *bridge) TagDeleted(repo reference.Named, tag string) error {
	event := b.createEvent(EventActionDelete)
	event.Target.Repository = repo.Name()
	event.Target.Tag = tag

	return b.sink.Write(*event)
}

func (b *bridge) RepoDeleted(repo reference.Named) error {
	event := b.createEvent(EventActionDelete)
	event.Target.Repository = repo.Name()

	return b.sink.Write(*event)
}

func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
	event := b.createEvent(action)
	event.Target.Repository = repo.Name()
	event.Target.Digest = dgst

	return b.sink.Write(*event)
}

func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) {
	event := b.createEvent(action)
	event.Target.Repository = repo.Name()

	mt, p, err := sm.Payload()
	if err != nil {
		return nil, err
	}

	// Ensure we have the canonical manifest descriptor here
	manifest, desc, err := distribution.UnmarshalManifest(mt, p)
	if err != nil {
		return nil, err
	}

	event.Target.MediaType = mt
	event.Target.Length = desc.Size
	event.Target.Size = desc.Size
	event.Target.Digest = desc.Digest
	if b.includeReferences {
		event.Target.References = append(event.Target.References, manifest.References()...)
	}

	ref, err := reference.WithDigest(repo, event.Target.Digest)
	if err != nil {
		return nil, err
	}

	event.Target.URL, err = b.ub.BuildManifestURL(ref)
	if err != nil {
		return nil, err
	}

	return event, nil
}

func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error {
	event := b.createEvent(action)
	event.Target.Digest = dgst
	event.Target.Repository = repo.Name()

	return b.sink.Write(*event)
}

func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error {
	event, err := b.createBlobEvent(action, repo, desc)
	if err != nil {
		return err
	}

	return b.sink.Write(*event)
}

func (b *bridge) createBlobEvent(action string, repo reference.Named, desc distribution.Descriptor) (*Event, error) {
	event := b.createEvent(action)
	event.Target.Descriptor = desc
	event.Target.Length = desc.Size
	event.Target.Repository = repo.Name()

	ref, err := reference.WithDigest(repo, desc.Digest)
	if err != nil {
		return nil, err
	}

	event.Target.URL, err = b.ub.BuildBlobURL(ref)
	if err != nil {
		return nil, err
	}

	return event, nil
}

// createEvent creates an event with actor and source populated.
func (b *bridge) createEvent(action string) *Event {
	event := createEvent(action)
	event.Source = b.source
	event.Actor = b.actor
	event.Request = b.request

	return event
}

// createEvent returns a new event, timestamped, with the specified action.
func createEvent(action string) *Event {
	return &Event{
		ID:        uuid.Generate().String(),
		Timestamp: time.Now(),
		Action:    action,
	}
}
07070100000001000081a40000000000000000000000016328304800001b47000000000000000000000000000000000000001d00000000notifications/bridge_test.gopackage notifications

import (
	"testing"

	"github.com/distribution/distribution/v3"
	"github.com/distribution/distribution/v3/manifest/schema1"
	"github.com/distribution/distribution/v3/reference"
	v2 "github.com/distribution/distribution/v3/registry/api/v2"
	"github.com/distribution/distribution/v3/uuid"
	events "github.com/docker/go-events"
	"github.com/docker/libtrust"
	"github.com/opencontainers/go-digest"
)

var (
	// common environment for expected manifest events.

	repo   = "test/repo"
	source = SourceRecord{
		Addr:       "remote.test",
		InstanceID: uuid.Generate().String(),
	}
	ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/", false))

	actor = ActorRecord{
		Name: "test",
	}
	request = RequestRecord{}
	layers  = []schema1.FSLayer{
		{
			BlobSum: "asdf",
		},
		{
			BlobSum: "qwer",
		},
	}
	m = schema1.Manifest{
		Name:     repo,
		Tag:      "latest",
		FSLayers: layers,
	}

	sm      *schema1.SignedManifest
	payload []byte
	dgst    digest.Digest
)

func TestEventBridgeManifestPulled(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkCommonManifest(t, EventActionPull, event)

		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.ManifestPulled(repoRef, sm); err != nil {
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
	}
}

func TestEventBridgeManifestPushed(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkCommonManifest(t, EventActionPush, event)

		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.ManifestPushed(repoRef, sm); err != nil {
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
	}
}

func TestEventBridgeManifestPushedWithTag(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkCommonManifest(t, EventActionPush, event)
		if event.(Event).Target.Tag != "latest" {
			t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target)
		}

		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.ManifestPushed(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
	}
}

func TestEventBridgeManifestPulledWithTag(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkCommonManifest(t, EventActionPull, event)
		if event.(Event).Target.Tag != "latest" {
			t.Fatalf("missing or unexpected tag: %#v", event.(Event).Target)
		}

		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.ManifestPulled(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
	}
}

func TestEventBridgeManifestDeleted(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkDeleted(t, EventActionDelete, event)
		if event.(Event).Target.Digest != dgst {
			t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst)
		}
		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.ManifestDeleted(repoRef, dgst); err != nil {
		t.Fatalf("unexpected error notifying manifest pull: %v", err)
	}
}

func TestEventBridgeTagDeleted(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkDeleted(t, EventActionDelete, event)
		if event.(Event).Target.Tag != m.Tag {
			t.Fatalf("unexpected tag on event target: %q != %q", event.(Event).Target.Tag, m.Tag)
		}
		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.TagDeleted(repoRef, m.Tag); err != nil {
		t.Fatalf("unexpected error notifying tag deletion: %v", err)
	}
}

func TestEventBridgeRepoDeleted(t *testing.T) {
	l := createTestEnv(t, testSinkFn(func(event events.Event) error {
		checkDeleted(t, EventActionDelete, event)
		return nil
	}))

	repoRef, _ := reference.WithName(repo)
	if err := l.RepoDeleted(repoRef); err != nil {
		t.Fatalf("unexpected error notifying repo deletion: %v", err)
	}
}

func createTestEnv(t *testing.T, fn testSinkFn) Listener {
	pk, err := libtrust.GenerateECP256PrivateKey()
	if err != nil {
		t.Fatalf("error generating private key: %v", err)
	}

	sm, err = schema1.Sign(&m, pk)
	if err != nil {
		t.Fatalf("error signing manifest: %v", err)
	}

	payload = sm.Canonical
	dgst = digest.FromBytes(payload)

	return NewBridge(ub, source, actor, request, fn, true)
}

func checkDeleted(t *testing.T, action string, event events.Event) {
	if event.(Event).Source != source {
		t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source)
	}

	if event.(Event).Request != request {
		t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request)
	}

	if event.(Event).Actor != actor {
		t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor)
	}

	if event.(Event).Target.Repository != repo {
		t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo)
	}
}

func checkCommonManifest(t *testing.T, action string, event events.Event) {
	checkCommon(t, event)

	if event.(Event).Action != action {
		t.Fatalf("unexpected event action: %q != %q", event.(Event).Action, action)
	}

	repoRef, _ := reference.WithName(repo)
	ref, _ := reference.WithDigest(repoRef, dgst)
	u, err := ub.BuildManifestURL(ref)
	if err != nil {
		t.Fatalf("error building expected url: %v", err)
	}

	if event.(Event).Target.URL != u {
		t.Fatalf("incorrect url passed: \n%q != \n%q", event.(Event).Target.URL, u)
	}

	if len(event.(Event).Target.References) != len(layers) {
		t.Fatalf("unexpected number of references %v != %v", len(event.(Event).Target.References), len(layers))
	}
	for i, targetReference := range event.(Event).Target.References {
		if targetReference.Digest != layers[i].BlobSum {
			t.Fatalf("unexpected reference: %q != %q", targetReference.Digest, layers[i].BlobSum)
		}
	}
}

func checkCommon(t *testing.T, event events.Event) {
	if event.(Event).Source != source {
		t.Fatalf("source not equal: %#v != %#v", event.(Event).Source, source)
	}

	if event.(Event).Request != request {
		t.Fatalf("request not equal: %#v != %#v", event.(Event).Request, request)
	}

	if event.(Event).Actor != actor {
		t.Fatalf("request not equal: %#v != %#v", event.(Event).Actor, actor)
	}

	if event.(Event).Target.Digest != dgst {
		t.Fatalf("unexpected digest on event target: %q != %q", event.(Event).Target.Digest, dgst)
	}

	if event.(Event).Target.Length != int64(len(payload)) {
		t.Fatalf("unexpected target length: %v != %v", event.(Event).Target.Length, len(payload))
	}

	if event.(Event).Target.Repository != repo {
		t.Fatalf("unexpected repository: %q != %q", event.(Event).Target.Repository, repo)
	}

}

type testSinkFn func(event events.Event) error

func (tsf testSinkFn) Write(event events.Event) error {
	return tsf(event)
}

func (tsf testSinkFn) Close() error { return nil }

func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder {
	if err != nil {
		panic(err)
	}

	return ub
}
07070100000002000081a40000000000000000000000016328304800000a46000000000000000000000000000000000000001a00000000notifications/endpoint.gopackage notifications

import (
	"net/http"
	"time"

	"github.com/distribution/distribution/v3/configuration"
	events "github.com/docker/go-events"
)

// EndpointConfig covers the optional configuration parameters for an active
// endpoint.
type EndpointConfig struct {
	Headers           http.Header
	Timeout           time.Duration
	Threshold         int
	Backoff           time.Duration
	IgnoredMediaTypes []string
	Transport         *http.Transport `json:"-"`
	Ignore            configuration.Ignore
}

// defaults set any zero-valued fields to a reasonable default.
func (ec *EndpointConfig) defaults() {
	if ec.Timeout <= 0 {
		ec.Timeout = time.Second
	}

	if ec.Threshold <= 0 {
		ec.Threshold = 10
	}

	if ec.Backoff <= 0 {
		ec.Backoff = time.Second
	}

	if ec.Transport == nil {
		ec.Transport = http.DefaultTransport.(*http.Transport)
	}
}

// Endpoint is a reliable, queued, thread-safe sink that notify external http
// services when events are written. Writes are non-blocking and always
// succeed for callers but events may be queued internally.
type Endpoint struct {
	events.Sink
	url  string
	name string

	EndpointConfig

	metrics *safeMetrics
}

// NewEndpoint returns a running endpoint, ready to receive events.
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
	var endpoint Endpoint
	endpoint.name = name
	endpoint.url = url
	endpoint.EndpointConfig = config
	endpoint.defaults()
	endpoint.metrics = newSafeMetrics(name)

	// Configures the inmemory queue, retry, http pipeline.
	endpoint.Sink = newHTTPSink(
		endpoint.url, endpoint.Timeout, endpoint.Headers,
		endpoint.Transport, endpoint.metrics.httpStatusListener())
	endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff))
	endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
	mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...)
	endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions)

	register(&endpoint)
	return &endpoint
}

// Name returns the name of the endpoint, generally used for debugging.
func (e *Endpoint) Name() string {
	return e.name
}

// URL returns the url of the endpoint.
func (e *Endpoint) URL() string {
	return e.url
}

// ReadMetrics populates em with metrics from the endpoint.
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
	e.metrics.Lock()
	defer e.metrics.Unlock()

	*em = e.metrics.EndpointMetrics
	// Map still need to copied in a threadsafe manner.
	em.Statuses = make(map[string]int)
	for k, v := range e.metrics.Statuses {
		em.Statuses[k] = v
	}
}
07070100000003000081a400000000000000000000000163283048000014c8000000000000000000000000000000000000001700000000notifications/event.gopackage notifications

import (
	"fmt"
	"time"

	"github.com/distribution/distribution/v3"
	events "github.com/docker/go-events"
)

// EventAction constants used in action field of Event.
const (
	EventActionPull   = "pull"
	EventActionPush   = "push"
	EventActionMount  = "mount"
	EventActionDelete = "delete"
)

const (
	// EventsMediaType is the mediatype for the json event envelope. If the
	// Event, ActorRecord, SourceRecord or Envelope structs change, the version
	// number should be incremented.
	EventsMediaType = "application/vnd.docker.distribution.events.v1+json"
	// LayerMediaType is the media type for image rootfs diffs (aka "layers")
	// used by Docker. We don't expect this to change for quite a while.
	layerMediaType = "application/vnd.docker.container.image.rootfs.diff+x-gtar"
)

// Envelope defines the fields of a json event envelope message that can hold
// one or more events.
type Envelope struct {
	// Events make up the contents of the envelope. Events present in a single
	// envelope are not necessarily related.
	Events []events.Event `json:"events,omitempty"`
}

// TODO(stevvooe): The event type should be separate from the json format. It
// should be defined as an interface. Leaving as is for now since we don't
// need that at this time. If we make this change, the struct below would be
// called "EventRecord".

// Event provides the fields required to describe a registry event.
type Event struct {
	// ID provides a unique identifier for the event.
	ID string `json:"id,omitempty"`

	// Timestamp is the time at which the event occurred.
	Timestamp time.Time `json:"timestamp,omitempty"`

	// Action indicates what action encompasses the provided event.
	Action string `json:"action,omitempty"`

	// Target uniquely describes the target of the event.
	Target struct {
		// TODO(stevvooe): Use http.DetectContentType for layers, maybe.

		distribution.Descriptor

		// Length in bytes of content. Same as Size field in Descriptor.
		// Provided for backwards compatibility.
		Length int64 `json:"length,omitempty"`

		// Repository identifies the named repository.
		Repository string `json:"repository,omitempty"`

		// FromRepository identifies the named repository which a blob was mounted
		// from if appropriate.
		FromRepository string `json:"fromRepository,omitempty"`

		// URL provides a direct link to the content.
		URL string `json:"url,omitempty"`

		// Tag provides the tag
		Tag string `json:"tag,omitempty"`

		// References provides the references descriptors.
		References []distribution.Descriptor `json:"references,omitempty"`
	} `json:"target,omitempty"`

	// Request covers the request that generated the event.
	Request RequestRecord `json:"request,omitempty"`

	// Actor specifies the agent that initiated the event. For most
	// situations, this could be from the authorization context of the request.
	Actor ActorRecord `json:"actor,omitempty"`

	// Source identifies the registry node that generated the event. Put
	// differently, while the actor "initiates" the event, the source
	// "generates" it.
	Source SourceRecord `json:"source,omitempty"`
}

// ActorRecord specifies the agent that initiated the event. For most
// situations, this could be from the authorization context of the request.
// Data in this record can refer to both the initiating client and the
// generating request.
type ActorRecord struct {
	// Name corresponds to the subject or username associated with the
	// request context that generated the event.
	Name string `json:"name,omitempty"`

	// TODO(stevvooe): Look into setting a session cookie to get this
	// without docker daemon.
	//    SessionID

	// TODO(stevvooe): Push the "Docker-Command" header to replace cookie and
	// get the actual command.
	//    Command
}

// RequestRecord covers the request that generated the event.
type RequestRecord struct {
	// ID uniquely identifies the request that initiated the event.
	ID string `json:"id"`

	// Addr contains the ip or hostname and possibly port of the client
	// connection that initiated the event. This is the RemoteAddr from
	// the standard http request.
	Addr string `json:"addr,omitempty"`

	// Host is the externally accessible host name of the registry instance,
	// as specified by the http host header on incoming requests.
	Host string `json:"host,omitempty"`

	// Method has the request method that generated the event.
	Method string `json:"method"`

	// UserAgent contains the user agent header of the request.
	UserAgent string `json:"useragent"`
}

// SourceRecord identifies the registry node that generated the event. Put
// differently, while the actor "initiates" the event, the source "generates"
// it.
type SourceRecord struct {
	// Addr contains the ip or hostname and the port of the registry node
	// that generated the event. Generally, this will be resolved by
	// os.Hostname() along with the running port.
	Addr string `json:"addr,omitempty"`

	// InstanceID identifies a running instance of an application. Changes
	// after each restart.
	InstanceID string `json:"instanceID,omitempty"`
}

var (
	// ErrSinkClosed is returned if a write is issued to a sink that has been
	// closed. If encountered, the error should be considered terminal and
	// retries will not be successful.
	ErrSinkClosed = fmt.Errorf("sink: closed")
)
07070100000004000081a40000000000000000000000016328304800001388000000000000000000000000000000000000001c00000000notifications/event_test.gopackage notifications

import (
	"encoding/json"
	"strings"
	"testing"
	"time"

	"github.com/distribution/distribution/v3/manifest/schema1"
)

// TestEventJSONFormat provides silly test to detect if the event format or
// envelope has changed. If this code fails, the revision of the protocol may
// need to be incremented.
func TestEventEnvelopeJSONFormat(t *testing.T) {
	var expected = strings.TrimSpace(`
{
   "events": [
      {
         "id": "asdf-asdf-asdf-asdf-0",
         "timestamp": "2006-01-02T15:04:05Z",
         "action": "push",
         "target": {
            "mediaType": "application/vnd.docker.distribution.manifest.v1+prettyjws",
            "size": 1,
            "digest": "sha256:0123456789abcdef0",
            "length": 1,
            "repository": "library/test",
            "url": "http://example.com/v2/library/test/manifests/latest"
         },
         "request": {
            "id": "asdfasdf",
            "addr": "client.local",
            "host": "registrycluster.local",
            "method": "PUT",
            "useragent": "test/0.1"
         },
         "actor": {
            "name": "test-actor"
         },
         "source": {
            "addr": "hostname.local:port"
         }
      },
      {
         "id": "asdf-asdf-asdf-asdf-1",
         "timestamp": "2006-01-02T15:04:05Z",
         "action": "push",
         "target": {
            "mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
            "size": 2,
            "digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5",
            "length": 2,
            "repository": "library/test",
            "url": "http://example.com/v2/library/test/manifests/latest"
         },
         "request": {
            "id": "asdfasdf",
            "addr": "client.local",
            "host": "registrycluster.local",
            "method": "PUT",
            "useragent": "test/0.1"
         },
         "actor": {
            "name": "test-actor"
         },
         "source": {
            "addr": "hostname.local:port"
         }
      },
      {
         "id": "asdf-asdf-asdf-asdf-2",
         "timestamp": "2006-01-02T15:04:05Z",
         "action": "push",
         "target": {
            "mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
            "size": 3,
            "digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d6",
            "length": 3,
            "repository": "library/test",
            "url": "http://example.com/v2/library/test/manifests/latest"
         },
         "request": {
            "id": "asdfasdf",
            "addr": "client.local",
            "host": "registrycluster.local",
            "method": "PUT",
            "useragent": "test/0.1"
         },
         "actor": {
            "name": "test-actor"
         },
         "source": {
            "addr": "hostname.local:port"
         }
      }
   ]
}
	`)

	tm, err := time.Parse(time.RFC3339, time.RFC3339[:len(time.RFC3339)-5])
	if err != nil {
		t.Fatalf("error creating time: %v", err)
	}

	var prototype Event
	prototype.Action = EventActionPush
	prototype.Timestamp = tm
	prototype.Actor.Name = "test-actor"
	prototype.Request.ID = "asdfasdf"
	prototype.Request.Addr = "client.local"
	prototype.Request.Host = "registrycluster.local"
	prototype.Request.Method = "PUT"
	prototype.Request.UserAgent = "test/0.1"
	prototype.Source.Addr = "hostname.local:port"

	var manifestPush = prototype
	manifestPush.ID = "asdf-asdf-asdf-asdf-0"
	manifestPush.Target.Digest = "sha256:0123456789abcdef0"
	manifestPush.Target.Length = 1
	manifestPush.Target.Size = 1
	manifestPush.Target.MediaType = schema1.MediaTypeSignedManifest
	manifestPush.Target.Repository = "library/test"
	manifestPush.Target.URL = "http://example.com/v2/library/test/manifests/latest"

	var layerPush0 = prototype
	layerPush0.ID = "asdf-asdf-asdf-asdf-1"
	layerPush0.Target.Digest = "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5"
	layerPush0.Target.Length = 2
	layerPush0.Target.Size = 2
	layerPush0.Target.MediaType = layerMediaType
	layerPush0.Target.Repository = "library/test"
	layerPush0.Target.URL = "http://example.com/v2/library/test/manifests/latest"

	var layerPush1 = prototype
	layerPush1.ID = "asdf-asdf-asdf-asdf-2"
	layerPush1.Target.Digest = "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d6"
	layerPush1.Target.Length = 3
	layerPush1.Target.Size = 3
	layerPush1.Target.MediaType = layerMediaType
	layerPush1.Target.Repository = "library/test"
	layerPush1.Target.URL = "http://example.com/v2/library/test/manifests/latest"

	var envelope Envelope
	envelope.Events = append(envelope.Events, manifestPush, layerPush0, layerPush1)

	p, err := json.MarshalIndent(envelope, "", "   ")
	if err != nil {
		t.Fatalf("unexpected error marshaling envelope: %v", err)
	}
	if string(p) != expected {
		t.Fatalf("format has changed\n%s\n != \n%s", string(p), expected)
	}
}
07070100000005000081a40000000000000000000000016328304800000f4c000000000000000000000000000000000000001600000000notifications/http.gopackage notifications

import (
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"

	events "github.com/docker/go-events"
)

// httpSink implements a single-flight, http notification endpoint. This is
// very lightweight in that it only makes an attempt at an http request.
// Reliability should be provided by the caller.
type httpSink struct {
	url string

	mu        sync.Mutex
	closed    bool
	client    *http.Client
	listeners []httpStatusListener

	// TODO(stevvooe): Allow one to configure the media type accepted by this
	// sink and choose the serialization based on that.
}

// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
	if transport == nil {
		transport = http.DefaultTransport.(*http.Transport)
	}
	return &httpSink{
		url:       u,
		listeners: listeners,
		client: &http.Client{
			Transport: &headerRoundTripper{
				Transport: transport,
				headers:   headers,
			},
			Timeout: timeout,
		},
	}
}

// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
	success(status int, event events.Event)
	failure(status int, events events.Event)
	err(err error, events events.Event)
}

// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(event events.Event) error {
	hs.mu.Lock()
	defer hs.mu.Unlock()
	defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()

	if hs.closed {
		return ErrSinkClosed
	}

	envelope := Envelope{
		Events: []events.Event{event},
	}

	// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
	// retry but we are going to do it to keep the code simple. It is likely
	// we could change the event struct to manage its own buffer.

	p, err := json.MarshalIndent(envelope, "", "   ")
	if err != nil {
		for _, listener := range hs.listeners {
			listener.err(err, event)
		}
		return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
	}

	body := bytes.NewReader(p)
	resp, err := hs.client.Post(hs.url, EventsMediaType, body)
	if err != nil {
		for _, listener := range hs.listeners {
			listener.err(err, event)
		}

		return fmt.Errorf("%v: error posting: %v", hs, err)
	}
	defer resp.Body.Close()

	// The notifier will treat any 2xx or 3xx response as accepted by the
	// endpoint.
	switch {
	case resp.StatusCode >= 200 && resp.StatusCode < 400:
		for _, listener := range hs.listeners {
			listener.success(resp.StatusCode, event)
		}

		// TODO(stevvooe): This is a little accepting: we may want to support
		// unsupported media type responses with retries using the correct
		// media type. There may also be cases that will never work.

		return nil
	default:
		for _, listener := range hs.listeners {
			listener.failure(resp.StatusCode, event)
		}
		return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
	}
}

// Close the endpoint
func (hs *httpSink) Close() error {
	hs.mu.Lock()
	defer hs.mu.Unlock()

	if hs.closed {
		return fmt.Errorf("httpsink: already closed")
	}

	hs.closed = true
	return nil
}

func (hs *httpSink) String() string {
	return fmt.Sprintf("httpSink{%s}", hs.url)
}

type headerRoundTripper struct {
	*http.Transport // must be transport to support CancelRequest
	headers         http.Header
}

func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
	var nreq = *req
	nreq.Header = make(http.Header)

	merge := func(headers http.Header) {
		for k, v := range headers {
			nreq.Header[k] = append(nreq.Header[k], v...)
		}
	}

	merge(req.Header)
	merge(hrt.headers)

	return hrt.Transport.RoundTrip(&nreq)
}
07070100000006000081a400000000000000000000000163283048000015d3000000000000000000000000000000000000001b00000000notifications/http_test.gopackage notifications

import (
	"crypto/tls"
	"encoding/json"
	"fmt"
	"mime"
	"net"
	"net/http"
	"net/http/httptest"
	"reflect"
	"strconv"
	"strings"
	"testing"

	"github.com/distribution/distribution/v3/manifest/schema1"
	events "github.com/docker/go-events"
)

// TestHTTPSink mocks out an http endpoint and notifies it under a couple of
// conditions, ensuring correct behavior.
func TestHTTPSink(t *testing.T) {
	serverHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		defer r.Body.Close()
		if r.Method != "POST" {
			w.WriteHeader(http.StatusMethodNotAllowed)
			t.Fatalf("unexpected request method: %v", r.Method)
			return
		}

		// Extract the content type and make sure it matches
		contentType := r.Header.Get("Content-Type")
		mediaType, _, err := mime.ParseMediaType(contentType)
		if err != nil {
			w.WriteHeader(http.StatusBadRequest)
			t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType)
			return
		}

		if mediaType != EventsMediaType {
			w.WriteHeader(http.StatusUnsupportedMediaType)
			t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType)
			return
		}

		var envelope Envelope
		dec := json.NewDecoder(r.Body)
		if err := dec.Decode(&envelope); err != nil {
			w.WriteHeader(http.StatusBadRequest)
			t.Fatalf("error decoding request body: %v", err)
			return
		}

		// Let caller choose the status
		status, err := strconv.Atoi(r.FormValue("status"))
		if err != nil {
			t.Logf("error parsing status: %v", err)

			// May just be empty, set status to 200
			status = http.StatusOK
		}

		w.WriteHeader(status)
	})
	server := httptest.NewTLSServer(serverHandler)

	metrics := newSafeMetrics("")
	sink := newHTTPSink(server.URL, 0, nil, nil,
		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})

	// first make sure that the default transport gives x509 untrusted cert error
	event := Event{}
	err := sink.Write(event)
	if !strings.Contains(err.Error(), "x509") && !strings.Contains(err.Error(), "unknown ca") {
		t.Fatal("TLS server with default transport should give unknown CA error")
	}
	if err := sink.Close(); err != nil {
		t.Fatalf("unexpected error closing http sink: %v", err)
	}

	// make sure that passing in the transport no longer gives this error
	tr := &http.Transport{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
	}
	sink = newHTTPSink(server.URL, 0, nil, tr,
		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
	err = sink.Write(event)
	if err != nil {
		t.Fatalf("unexpected error writing event: %v", err)
	}

	// reset server to standard http server and sink to a basic sink
	metrics = newSafeMetrics("")
	server = httptest.NewServer(serverHandler)
	sink = newHTTPSink(server.URL, 0, nil, nil,
		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
	var expectedMetrics EndpointMetrics
	expectedMetrics.Statuses = make(map[string]int)

	closeL, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		t.Fatalf("unexpected error creating listener: %v", err)
	}
	defer closeL.Close()
	go func() {
		for {
			c, err := closeL.Accept()
			if err != nil {
				return
			}
			c.Close()
		}
	}()

	for _, tc := range []struct {
		event      events.Event // events to send
		url        string
		isFailure  bool // true if there should be a failure.
		isError    bool // true if the request returns an error
		statusCode int  // if not set, no status code should be incremented.
	}{
		{
			statusCode: http.StatusOK,
			event:      createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
		},
		{
			statusCode: http.StatusOK,
			event:      createTestEvent("push", "library/test", schema1.MediaTypeSignedManifest),
		},
		{
			statusCode: http.StatusOK,
			event:      createTestEvent("push", "library/test", layerMediaType),
		},
		{
			statusCode: http.StatusOK,
			event:      createTestEvent("push", "library/test", layerMediaType),
		},
		{
			statusCode: http.StatusTemporaryRedirect,
		},
		{
			statusCode: http.StatusBadRequest,
			isFailure:  true,
		},
		{
			// Case where connection is immediately closed
			url:     "http://" + closeL.Addr().String(),
			isError: true,
		},
	} {

		if tc.isFailure {
			expectedMetrics.Failures++
		} else if tc.isError {
			expectedMetrics.Errors++
		} else {
			expectedMetrics.Successes++
		}

		if tc.statusCode > 0 {
			expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))]++
		}

		url := tc.url
		if url == "" {
			url = server.URL + "/"
		}
		// setup endpoint to respond with expected status code.
		url += fmt.Sprintf("?status=%v", tc.statusCode)
		sink.url = url

		t.Logf("testcase: %v, fail=%v, error=%v", url, tc.isFailure, tc.isError)
		// Try a simple event emission.
		err := sink.Write(tc.event)

		if !tc.isFailure && !tc.isError {
			if err != nil {
				t.Fatalf("unexpected error send event: %v", err)
			}
		} else {
			if err == nil {
				t.Fatalf("the endpoint should have rejected the request")
			}
			t.Logf("write error: %v", err)
		}

		if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) {
			t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics)
		}
	}

	if err := sink.Close(); err != nil {
		t.Fatalf("unexpected error closing http sink: %v", err)
	}

	// double close returns error
	if err := sink.Close(); err == nil {
		t.Fatalf("second close should have returned error: %v", err)
	}

}

func createTestEvent(action, repo, typ string) Event {
	event := createEvent(action)

	event.Target.MediaType = typ
	event.Target.Repository = repo

	return *event
}
07070100000007000081a400000000000000000000000163283048000021e0000000000000000000000000000000000000001a00000000notifications/listener.gopackage notifications

import (
	"context"
	"net/http"

	"github.com/distribution/distribution/v3"

	dcontext "github.com/distribution/distribution/v3/context"
	"github.com/distribution/distribution/v3/reference"
	"github.com/opencontainers/go-digest"
)

// ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface {
	ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
	ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
	ManifestDeleted(repo reference.Named, dgst digest.Digest) error
}

// BlobListener describes a listener that can respond to layer related events.
type BlobListener interface {
	BlobPushed(repo reference.Named, desc distribution.Descriptor) error
	BlobPulled(repo reference.Named, desc distribution.Descriptor) error
	BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error
	BlobDeleted(repo reference.Named, desc digest.Digest) error
}

// RepoListener provides repository methods that respond to repository lifecycle
type RepoListener interface {
	TagDeleted(repo reference.Named, tag string) error
	RepoDeleted(repo reference.Named) error
}

// Listener combines all repository events into a single interface.
type Listener interface {
	ManifestListener
	BlobListener
	RepoListener
}

type repositoryListener struct {
	distribution.Repository
	listener Listener
}

type removerListener struct {
	distribution.RepositoryRemover
	listener Listener
}

// Listen dispatches events on the repository to the listener.
func Listen(repo distribution.Repository, remover distribution.RepositoryRemover, listener Listener) (distribution.Repository, distribution.RepositoryRemover) {
	return &repositoryListener{
			Repository: repo,
			listener:   listener,
		}, &removerListener{
			RepositoryRemover: remover,
			listener:          listener,
		}
}

func (nl *removerListener) Remove(ctx context.Context, name reference.Named) error {
	err := nl.RepositoryRemover.Remove(ctx, name)
	if err != nil {
		return err
	}
	return nl.listener.RepoDeleted(name)
}

func (rl *repositoryListener) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
	manifests, err := rl.Repository.Manifests(ctx, options...)
	if err != nil {
		return nil, err
	}
	return &manifestServiceListener{
		ManifestService: manifests,
		parent:          rl,
	}, nil
}

func (rl *repositoryListener) Blobs(ctx context.Context) distribution.BlobStore {
	return &blobServiceListener{
		BlobStore: rl.Repository.Blobs(ctx),
		parent:    rl,
	}
}

type manifestServiceListener struct {
	distribution.ManifestService
	parent *repositoryListener
}

func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
	err := msl.ManifestService.Delete(ctx, dgst)
	if err == nil {
		if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching manifest delete to listener: %v", err)
		}
	}

	return err
}

func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
	sm, err := msl.ManifestService.Get(ctx, dgst, options...)
	if err == nil {
		if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching manifest pull to listener: %v", err)
		}
	}

	return sm, err
}

func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
	dgst, err := msl.ManifestService.Put(ctx, sm, options...)

	if err == nil {
		if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching manifest push to listener: %v", err)
		}
	}

	return dgst, err
}

type blobServiceListener struct {
	distribution.BlobStore
	parent *repositoryListener
}

var _ distribution.BlobStore = &blobServiceListener{}

func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
	p, err := bsl.BlobStore.Get(ctx, dgst)
	if err == nil {
		if desc, err := bsl.Stat(ctx, dgst); err != nil {
			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
		} else {
			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
			}
		}
	}

	return p, err
}

func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
	rc, err := bsl.BlobStore.Open(ctx, dgst)
	if err == nil {
		if desc, err := bsl.Stat(ctx, dgst); err != nil {
			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
		} else {
			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
			}
		}
	}

	return rc, err
}

func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
	err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
	if err == nil {
		if desc, err := bsl.Stat(ctx, dgst); err != nil {
			dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
		} else {
			if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
				dcontext.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
			}
		}
	}

	return err
}

func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
	desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
	if err == nil {
		if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Named(), desc); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching layer push to listener: %v", err)
		}
	}

	return desc, err
}

func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
	wr, err := bsl.BlobStore.Create(ctx, options...)
	switch err := err.(type) {
	case distribution.ErrBlobMounted:
		if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Named(), err.Descriptor, err.From); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
		}
		return nil, err
	}
	return bsl.decorateWriter(wr), err
}

func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error {
	err := bsl.BlobStore.Delete(ctx, dgst)
	if err == nil {
		if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err)
		}
	}

	return err
}

func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
	wr, err := bsl.BlobStore.Resume(ctx, id)
	return bsl.decorateWriter(wr), err
}

func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
	return &blobWriterListener{
		BlobWriter: wr,
		parent:     bsl,
	}
}

type blobWriterListener struct {
	distribution.BlobWriter
	parent *blobServiceListener
}

func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
	committed, err := bwl.BlobWriter.Commit(ctx, desc)
	if err == nil {
		if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Named(), committed); err != nil {
			dcontext.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
		}
	}

	return committed, err
}

type tagServiceListener struct {
	distribution.TagService
	parent *repositoryListener
}

func (rl *repositoryListener) Tags(ctx context.Context) distribution.TagService {
	return &tagServiceListener{
		TagService: rl.Repository.Tags(ctx),
		parent:     rl,
	}
}

func (tagSL *tagServiceListener) Untag(ctx context.Context, tag string) error {
	if err := tagSL.TagService.Untag(ctx, tag); err != nil {
		return err
	}
	if err := tagSL.parent.listener.TagDeleted(tagSL.parent.Repository.Named(), tag); err != nil {
		dcontext.GetLogger(ctx).Errorf("error dispatching tag deleted to listener: %v", err)
		return err
	}
	return nil
}
07070100000008000081a400000000000000000000000163283048000018fe000000000000000000000000000000000000001f00000000notifications/listener_test.gopackage notifications

import (
	"io"
	"reflect"
	"testing"

	"github.com/distribution/distribution/v3"
	"github.com/distribution/distribution/v3/context"
	"github.com/distribution/distribution/v3/manifest"
	"github.com/distribution/distribution/v3/manifest/schema1"
	"github.com/distribution/distribution/v3/reference"
	"github.com/distribution/distribution/v3/registry/storage"
	"github.com/distribution/distribution/v3/registry/storage/cache/memory"
	"github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
	"github.com/distribution/distribution/v3/testutil"
	"github.com/docker/libtrust"
	"github.com/opencontainers/go-digest"
)

func TestListener(t *testing.T) {
	ctx := context.Background()
	k, err := libtrust.GenerateECP256PrivateKey()
	if err != nil {
		t.Fatal(err)
	}

	registry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableDelete, storage.EnableRedirect, storage.Schema1SigningKey(k), storage.EnableSchema1)
	if err != nil {
		t.Fatalf("error creating registry: %v", err)
	}
	tl := &testListener{
		ops: make(map[string]int),
	}

	repoRef, _ := reference.WithName("foo/bar")
	repository, err := registry.Repository(ctx, repoRef)
	if err != nil {
		t.Fatalf("unexpected error getting repo: %v", err)
	}

	remover, ok := registry.(distribution.RepositoryRemover)
	if !ok {
		t.Fatal("registry does not implement RepositoryRemover")
	}
	repository, remover = Listen(repository, remover, tl)

	// Now take the registry through a number of operations
	checkExerciseRepository(t, repository, remover)

	expectedOps := map[string]int{
		"manifest:push":   1,
		"manifest:pull":   1,
		"manifest:delete": 1,
		"layer:push":      2,
		"layer:pull":      2,
		"layer:delete":    2,
		"tag:delete":      1,
		"repo:delete":     1,
	}

	if !reflect.DeepEqual(tl.ops, expectedOps) {
		t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps)
	}
}

type testListener struct {
	ops map[string]int
}

func (tl *testListener) ManifestPushed(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
	tl.ops["manifest:push"]++
	return nil
}

func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
	tl.ops["manifest:pull"]++
	return nil
}

func (tl *testListener) ManifestDeleted(repo reference.Named, d digest.Digest) error {
	tl.ops["manifest:delete"]++
	return nil
}

func (tl *testListener) BlobPushed(repo reference.Named, desc distribution.Descriptor) error {
	tl.ops["layer:push"]++
	return nil
}

func (tl *testListener) BlobPulled(repo reference.Named, desc distribution.Descriptor) error {
	tl.ops["layer:pull"]++
	return nil
}

func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error {
	tl.ops["layer:mount"]++
	return nil
}

func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error {
	tl.ops["layer:delete"]++
	return nil
}

func (tl *testListener) TagDeleted(repo reference.Named, tag string) error {
	tl.ops["tag:delete"]++
	return nil
}

func (tl *testListener) RepoDeleted(repo reference.Named) error {
	tl.ops["repo:delete"]++
	return nil
}

// checkExerciseRegistry takes the registry through all of its operations,
// carrying out generic checks.
func checkExerciseRepository(t *testing.T, repository distribution.Repository, remover distribution.RepositoryRemover) {
	// TODO(stevvooe): This would be a nice testutil function. Basically, it
	// takes the registry through a common set of operations. This could be
	// used to make cross-cutting updates by changing internals that affect
	// update counts. Basically, it would make writing tests a lot easier.

	ctx := context.Background()
	tag := "thetag"
	// todo: change this to use Builder

	m := schema1.Manifest{
		Versioned: manifest.Versioned{
			SchemaVersion: 1,
		},
		Name: repository.Named().Name(),
		Tag:  tag,
	}

	var blobDigests []digest.Digest
	blobs := repository.Blobs(ctx)
	for i := 0; i < 2; i++ {
		rs, dgst, err := testutil.CreateRandomTarFile()
		if err != nil {
			t.Fatalf("error creating test layer: %v", err)
		}
		blobDigests = append(blobDigests, dgst)

		wr, err := blobs.Create(ctx)
		if err != nil {
			t.Fatalf("error creating layer upload: %v", err)
		}

		// Use the resumes, as well!
		wr, err = blobs.Resume(ctx, wr.ID())
		if err != nil {
			t.Fatalf("error resuming layer upload: %v", err)
		}

		io.Copy(wr, rs)

		if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil {
			t.Fatalf("unexpected error finishing upload: %v", err)
		}

		m.FSLayers = append(m.FSLayers, schema1.FSLayer{
			BlobSum: dgst,
		})
		m.History = append(m.History, schema1.History{
			V1Compatibility: "",
		})

		// Then fetch the blobs
		if rc, err := blobs.Open(ctx, dgst); err != nil {
			t.Fatalf("error fetching layer: %v", err)
		} else {
			defer rc.Close()
		}
	}

	pk, err := libtrust.GenerateECP256PrivateKey()
	if err != nil {
		t.Fatalf("unexpected error generating key: %v", err)
	}

	sm, err := schema1.Sign(&m, pk)
	if err != nil {
		t.Fatalf("unexpected error signing manifest: %v", err)
	}

	manifests, err := repository.Manifests(ctx)
	if err != nil {
		t.Fatal(err.Error())
	}

	var digestPut digest.Digest
	if digestPut, err = manifests.Put(ctx, sm); err != nil {
		t.Fatalf("unexpected error putting the manifest: %v", err)
	}

	dgst := digest.FromBytes(sm.Canonical)
	if dgst != digestPut {
		t.Fatalf("mismatching digest from payload and put")
	}

	if err := repository.Tags(ctx).Tag(ctx, tag, distribution.Descriptor{Digest: dgst}); err != nil {
		t.Fatalf("unexpected error tagging manifest: %v", err)
	}

	_, err = manifests.Get(ctx, dgst)
	if err != nil {
		t.Fatalf("unexpected error fetching manifest: %v", err)
	}

	err = repository.Tags(ctx).Untag(ctx, m.Tag)
	if err != nil {
		t.Fatalf("unexpected error deleting tag: %v", err)
	}

	err = manifests.Delete(ctx, dgst)
	if err != nil {
		t.Fatalf("unexpected error deleting blob: %v", err)
	}

	for _, d := range blobDigests {
		err = blobs.Delete(ctx, d)
		if err != nil {
			t.Fatalf("unexpected error deleting blob: %v", err)
		}
	}

	err = remover.Remove(ctx, repository.Named())
	if err != nil {
		t.Fatalf("unexpected error deleting repo: %v", err)
	}
}
07070100000009000081a4000000000000000000000001632830480000157e000000000000000000000000000000000000001900000000notifications/metrics.gopackage notifications

import (
	"expvar"
	"fmt"
	"net/http"
	"sync"

	prometheus "github.com/distribution/distribution/v3/metrics"
	events "github.com/docker/go-events"
	"github.com/docker/go-metrics"
)

var (
	// eventsCounter counts total events of incoming, success, failure, and errors
	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "endpoint")
	// pendingGauge measures the pending queue size
	pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "endpoint")
	// statusCounter counts the total notification call per each status code
	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "endpoint")
)

// EndpointMetrics track various actions taken by the endpoint, typically by
// number of events. The goal of this to export it via expvar but we may find
// some other future solution to be better.
type EndpointMetrics struct {
	Pending   int            // events pending in queue
	Events    int            // total events incoming
	Successes int            // total events written successfully
	Failures  int            // total events failed
	Errors    int            // total events errored
	Statuses  map[string]int // status code histogram, per call event
}

// safeMetrics guards the metrics implementation with a lock and provides a
// safe update function.
type safeMetrics struct {
	EndpointName string
	EndpointMetrics
	sync.Mutex // protects statuses map
}

// newSafeMetrics returns safeMetrics with map allocated.
func newSafeMetrics(name string) *safeMetrics {
	var sm safeMetrics
	sm.Statuses = make(map[string]int)
	sm.EndpointName = name
	return &sm
}

// httpStatusListener returns the listener for the http sink that updates the
// relevant counters.
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
	return &endpointMetricsHTTPStatusListener{
		safeMetrics: sm,
	}
}

// eventQueueListener returns a listener that maintains queue related counters.
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
	return &endpointMetricsEventQueueListener{
		safeMetrics: sm,
	}
}

// endpointMetricsHTTPStatusListener increments counters related to http sinks
// for the relevant events.
type endpointMetricsHTTPStatusListener struct {
	*safeMetrics
}

var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}

func (emsl *endpointMetricsHTTPStatusListener) success(status int, event events.Event) {
	emsl.safeMetrics.Lock()
	defer emsl.safeMetrics.Unlock()
	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
	emsl.Successes++

	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
	eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
}

func (emsl *endpointMetricsHTTPStatusListener) failure(status int, event events.Event) {
	emsl.safeMetrics.Lock()
	defer emsl.safeMetrics.Unlock()
	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))]++
	emsl.Failures++

	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
	eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
}

func (emsl *endpointMetricsHTTPStatusListener) err(err error, event events.Event) {
	emsl.safeMetrics.Lock()
	defer emsl.safeMetrics.Unlock()
	emsl.Errors++

	eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
}

// endpointMetricsEventQueueListener maintains the incoming events counter and
// the queues pending count.
type endpointMetricsEventQueueListener struct {
	*safeMetrics
}

func (eqc *endpointMetricsEventQueueListener) ingress(event events.Event) {
	eqc.Lock()
	defer eqc.Unlock()
	eqc.Events++
	eqc.Pending++

	eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
	pendingGauge.WithValues(eqc.EndpointName).Inc(1)
}

func (eqc *endpointMetricsEventQueueListener) egress(event events.Event) {
	eqc.Lock()
	defer eqc.Unlock()
	eqc.Pending--

	pendingGauge.WithValues(eqc.EndpointName).Dec(1)
}

// endpoints is global registry of endpoints used to report metrics to expvar
var endpoints struct {
	registered []*Endpoint
	mu         sync.Mutex
}

// register places the endpoint into expvar so that stats are tracked.
func register(e *Endpoint) {
	endpoints.mu.Lock()
	defer endpoints.mu.Unlock()

	endpoints.registered = append(endpoints.registered, e)
}

func init() {
	// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
	// Ideally, we do more metrics through logging but we need some nice
	// realtime metrics for queue state for now.

	registry := expvar.Get("registry")

	if registry == nil {
		registry = expvar.NewMap("registry")
	}

	var notifications expvar.Map
	notifications.Init()
	notifications.Set("endpoints", expvar.Func(func() interface{} {
		endpoints.mu.Lock()
		defer endpoints.mu.Unlock()

		var names []interface{}
		for _, v := range endpoints.registered {
			var epjson struct {
				Name string `json:"name"`
				URL  string `json:"url"`
				EndpointConfig

				Metrics EndpointMetrics
			}

			epjson.Name = v.Name()
			epjson.URL = v.URL()
			epjson.EndpointConfig = v.EndpointConfig

			v.ReadMetrics(&epjson.Metrics)

			names = append(names, epjson)
		}

		return names
	}))

	registry.(*expvar.Map).Set("notifications", &notifications)

	// register prometheus metrics
	metrics.Register(prometheus.NotificationsNamespace)
}
0707010000000a000081a400000000000000000000000163283048000002e9000000000000000000000000000000000000001e00000000notifications/metrics_test.gopackage notifications

import (
	"encoding/json"
	"expvar"
	"testing"
)

func TestMetricsExpvar(t *testing.T) {
	endpointsVar := expvar.Get("registry").(*expvar.Map).Get("notifications").(*expvar.Map).Get("endpoints")

	var v interface{}
	if err := json.Unmarshal([]byte(endpointsVar.String()), &v); err != nil {
		t.Fatalf("unexpected error unmarshaling endpoints: %v", err)
	}
	if v != nil {
		t.Fatalf("expected nil, got %#v", v)
	}

	NewEndpoint("x", "y", EndpointConfig{})

	if err := json.Unmarshal([]byte(endpointsVar.String()), &v); err != nil {
		t.Fatalf("unexpected error unmarshaling endpoints: %v", err)
	}
	if slice, ok := v.([]interface{}); !ok || len(slice) != 1 {
		t.Logf("expected one-element []interface{}, got %#v", v)
	}
}
0707010000000b000081a40000000000000000000000016328304800000ec3000000000000000000000000000000000000001700000000notifications/sinks.gopackage notifications

import (
	"container/list"
	"fmt"
	"sync"

	events "github.com/docker/go-events"
	"github.com/sirupsen/logrus"
)

// eventQueue accepts all messages into a queue for asynchronous consumption
// by a sink. It is unbounded and thread safe but the sink must be reliable or
// events will be dropped.
type eventQueue struct {
	sink      events.Sink
	events    *list.List
	listeners []eventQueueListener
	cond      *sync.Cond
	mu        sync.Mutex
	closed    bool
}

// eventQueueListener is called when various events happen on the queue.
type eventQueueListener interface {
	ingress(event events.Event)
	egress(event events.Event)
}

// newEventQueue returns a queue to the provided sink. If the updater is non-
// nil, it will be called to update pending metrics on ingress and egress.
func newEventQueue(sink events.Sink, listeners ...eventQueueListener) *eventQueue {
	eq := eventQueue{
		sink:      sink,
		events:    list.New(),
		listeners: listeners,
	}

	eq.cond = sync.NewCond(&eq.mu)
	go eq.run()
	return &eq
}

// Write accepts the events into the queue, only failing if the queue has
// beend closed.
func (eq *eventQueue) Write(event events.Event) error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return ErrSinkClosed
	}

	for _, listener := range eq.listeners {
		listener.ingress(event)
	}
	eq.events.PushBack(event)
	eq.cond.Signal() // signal waiters

	return nil
}

// Close shuts down the event queue, flushing
func (eq *eventQueue) Close() error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return fmt.Errorf("eventqueue: already closed")
	}

	// set closed flag
	eq.closed = true
	eq.cond.Signal() // signal flushes queue
	eq.cond.Wait()   // wait for signal from last flush

	return eq.sink.Close()
}

// run is the main goroutine to flush events to the target sink.
func (eq *eventQueue) run() {
	for {
		event := eq.next()

		if event == nil {
			return // nil block means event queue is closed.
		}

		if err := eq.sink.Write(event); err != nil {
			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
		}

		for _, listener := range eq.listeners {
			listener.egress(event)
		}
	}
}

// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *eventQueue) next() events.Event {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	for eq.events.Len() < 1 {
		if eq.closed {
			eq.cond.Broadcast()
			return nil
		}

		eq.cond.Wait()
	}

	front := eq.events.Front()
	block := front.Value.(events.Event)
	eq.events.Remove(front)

	return block
}

// ignoredSink discards events with ignored target media types and actions.
// passes the rest along.
type ignoredSink struct {
	events.Sink
	ignoreMediaTypes map[string]bool
	ignoreActions    map[string]bool
}

func newIgnoredSink(sink events.Sink, ignored []string, ignoreActions []string) events.Sink {
	if len(ignored) == 0 {
		return sink
	}

	ignoredMap := make(map[string]bool)
	for _, mediaType := range ignored {
		ignoredMap[mediaType] = true
	}

	ignoredActionsMap := make(map[string]bool)
	for _, action := range ignoreActions {
		ignoredActionsMap[action] = true
	}

	return &ignoredSink{
		Sink:             sink,
		ignoreMediaTypes: ignoredMap,
		ignoreActions:    ignoredActionsMap,
	}
}

// Write discards events with ignored target media types and passes the rest
// along.
func (imts *ignoredSink) Write(event events.Event) error {
	if imts.ignoreMediaTypes[event.(Event).Target.MediaType] || imts.ignoreActions[event.(Event).Action] {
		return nil
	}

	return imts.Sink.Write(event)
}

func (imts *ignoredSink) Close() error {
	return nil
}
0707010000000c000081a40000000000000000000000016328304800000f22000000000000000000000000000000000000001c00000000notifications/sinks_test.gopackage notifications

import (
	"reflect"
	"sync"
	"time"

	events "github.com/docker/go-events"

	"github.com/sirupsen/logrus"

	"testing"
)

func TestEventQueue(t *testing.T) {
	const nevents = 1000
	var ts testSink
	metrics := newSafeMetrics("")
	eq := newEventQueue(
		// delayed sync simulates destination slower than channel comms
		&delayedSink{
			Sink:  &ts,
			delay: time.Millisecond * 1,
		}, metrics.eventQueueListener())

	var wg sync.WaitGroup
	var event events.Event
	for i := 1; i <= nevents; i++ {
		event = createTestEvent("push", "library/test", "blob")
		wg.Add(1)
		go func(event events.Event) {
			if err := eq.Write(event); err != nil {
				t.Errorf("error writing event block: %v", err)
			}
			wg.Done()
		}(event)
	}

	wg.Wait()
	if t.Failed() {
		t.FailNow()
	}
	checkClose(t, eq)

	ts.mu.Lock()
	defer ts.mu.Unlock()
	metrics.Lock()
	defer metrics.Unlock()

	if ts.count != nevents {
		t.Fatalf("events did not make it to the sink: %d != %d", ts.count, 1000)
	}

	if !ts.closed {
		t.Fatalf("sink should have been closed")
	}

	if metrics.Events != nevents {
		t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
	}

	if metrics.Pending != 0 {
		t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
	}
}

func TestIgnoredSink(t *testing.T) {
	blob := createTestEvent("push", "library/test", "blob")
	manifest := createTestEvent("pull", "library/test", "manifest")

	type testcase struct {
		ignoreMediaTypes []string
		ignoreActions    []string
		expected         events.Event
	}

	cases := []testcase{
		{nil, nil, blob},
		{[]string{"other"}, []string{"other"}, blob},
		{[]string{"blob", "manifest"}, []string{"other"}, nil},
		{[]string{"other"}, []string{"pull"}, blob},
		{[]string{"other"}, []string{"pull", "push"}, nil},
	}

	for _, c := range cases {
		ts := &testSink{}
		s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)

		if err := s.Write(blob); err != nil {
			t.Fatalf("error writing event: %v", err)
		}

		ts.mu.Lock()
		if !reflect.DeepEqual(ts.event, c.expected) {
			t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected)
		}
		ts.mu.Unlock()
	}

	cases = []testcase{
		{nil, nil, manifest},
		{[]string{"other"}, []string{"other"}, manifest},
		{[]string{"blob"}, []string{"other"}, manifest},
		{[]string{"blob", "manifest"}, []string{"other"}, nil},
		{[]string{"other"}, []string{"push"}, manifest},
		{[]string{"other"}, []string{"pull", "push"}, nil},
	}

	for _, c := range cases {
		ts := &testSink{}
		s := newIgnoredSink(ts, c.ignoreMediaTypes, c.ignoreActions)

		if err := s.Write(manifest); err != nil {
			t.Fatalf("error writing event: %v", err)
		}

		ts.mu.Lock()
		if !reflect.DeepEqual(ts.event, c.expected) {
			t.Fatalf("unexpected event: %#v != %#v", ts.event, c.expected)
		}
		ts.mu.Unlock()
	}
}

type testSink struct {
	event  events.Event
	count  int
	mu     sync.Mutex
	closed bool
}

func (ts *testSink) Write(event events.Event) error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.event = event
	ts.count++
	return nil
}

func (ts *testSink) Close() error {
	ts.mu.Lock()
	defer ts.mu.Unlock()
	ts.closed = true

	logrus.Infof("closing testSink")
	return nil
}

type delayedSink struct {
	events.Sink
	delay time.Duration
}

func (ds *delayedSink) Write(event events.Event) error {
	time.Sleep(ds.delay)
	return ds.Sink.Write(event)
}

func checkClose(t *testing.T, sink events.Sink) {
	if err := sink.Close(); err != nil {
		t.Fatalf("unexpected error closing: %v", err)
	}

	// second close should not crash but should return an error.
	if err := sink.Close(); err == nil {
		t.Fatalf("no error on double close")
	}

	// Write after closed should be an error
	if err := sink.Write(Event{}); err == nil {
		t.Fatalf("write after closed did not have an error")
	} else if err != ErrSinkClosed {
		t.Fatalf("error should be ErrSinkClosed")
	}
}
0707010000000d000041ed0000000000000000000000016328304800000000000000000000000000000000000000000000000e00000000notifications07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000b00000000TRAILER!!!
openSUSE Build Service is sponsored by