File krelay-0.1.3.obscpio of Package krelay
07070100000000000081A400000000000000000000000168B36E500000000C000000000000000000000000000000000000001B00000000krelay-0.1.3/.dockerignoredist/
.git/
07070100000001000081A400000000000000000000000168B36E5000000307000000000000000000000000000000000000001C00000000krelay-0.1.3/.golangci.yamlversion: "2"
run:
modules-download-mode: readonly
linters:
enable:
- copyloopvar
- dupl
- gochecknoinits
- goconst
- gocritic
- misspell
- nolintlint
- prealloc
- revive
- testifylint
- unconvert
- unparam
- usestdlibvars
- whitespace
settings:
govet:
enable:
- nilness
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- errcheck
path: _test.go
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gofmt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
07070100000002000081A400000000000000000000000168B36E50000002D0000000000000000000000000000000000000001E00000000krelay-0.1.3/.goreleaser.yamlversion: 2
before:
hooks:
- go mod tidy
builds:
- env:
- CGO_ENABLED=0
main: ./cmd/client
binary: kubectl-relay
goos:
- windows
- darwin
- linux
goarch:
- amd64
- arm64
ignore:
- goos: windows
goarch: arm64
gcflags:
- all=-trimpath
archives:
-
name_template: 'kubectl-relay_v{{ .Version }}_{{ .Os }}-{{ .Arch }}'
checksum:
name_template: 'checksums.txt'
snapshot:
version_template: "{{ incpatch .Version }}-next"
changelog:
use: github-native
# modelines, feel free to remove those if you don't want/use them:
# yaml-language-server: $schema=https://goreleaser.com/static/schema.json
# vim: set ts=2 sw=2 tw=0 fo=cnqoj
07070100000003000081A400000000000000000000000168B36E5000000773000000000000000000000000000000000000001800000000krelay-0.1.3/.krew.yamlapiVersion: krew.googlecontainertools.github.com/v1alpha2
kind: Plugin
metadata:
name: relay
spec:
version: {{ .TagName }}
homepage: https://github.com/knight42/krelay
shortDescription: Drop-in "port-forward" replacement with UDP and hostname resolution.
description: |
This kubectl plugin is a drop-in replacement for `kubectl port-forward` with some enhanced features:
* Supports UDP port forwarding
* Supports simultaneous forwarding of data to multiple targets
* Forwarding session will not be interfered after performing rolling updates
* Forwarding data to the given IP or hostname that is accessible within the kubernetes cluster
platforms:
- selector:
matchLabels:
os: darwin
arch: amd64
{{addURIAndSha "https://github.com/knight42/krelay/releases/download/{{ .TagName }}/kubectl-relay_{{ .TagName }}_darwin-amd64.tar.gz" .TagName }}
bin: kubectl-relay
- selector:
matchLabels:
os: darwin
arch: arm64
{{addURIAndSha "https://github.com/knight42/krelay/releases/download/{{ .TagName }}/kubectl-relay_{{ .TagName }}_darwin-arm64.tar.gz" .TagName }}
bin: kubectl-relay
- selector:
matchLabels:
os: linux
arch: amd64
{{addURIAndSha "https://github.com/knight42/krelay/releases/download/{{ .TagName }}/kubectl-relay_{{ .TagName }}_linux-amd64.tar.gz" .TagName }}
bin: kubectl-relay
- selector:
matchLabels:
os: linux
arch: arm64
{{addURIAndSha "https://github.com/knight42/krelay/releases/download/{{ .TagName }}/kubectl-relay_{{ .TagName }}_linux-arm64.tar.gz" .TagName }}
bin: kubectl-relay
- selector:
matchLabels:
os: windows
arch: amd64
{{addURIAndSha "https://github.com/knight42/krelay/releases/download/{{ .TagName }}/kubectl-relay_{{ .TagName }}_windows-amd64.tar.gz" .TagName }}
bin: kubectl-relay.exe
07070100000004000081A400000000000000000000000168B36E500000042A000000000000000000000000000000000000001500000000krelay-0.1.3/LICENSEMIT License
Copyright (c) 2021 Jian Zeng
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
07070100000005000081A400000000000000000000000168B36E5000000287000000000000000000000000000000000000001600000000krelay-0.1.3/MakefileIMAGE_TAG ?= latest
NAME := kubectl-relay
GOBUILD = CGO_ENABLED=0 go build -trimpath
.PHONY: server-image push-server-image
server-image:
docker build -t ghcr.io/knight42/krelay-server:$(IMAGE_TAG) -f manifests/Dockerfile-server .
push-server-image: server-image
docker push ghcr.io/knight42/krelay-server:$(IMAGE_TAG)
.PHONY: krelay
krelay:
$(GOBUILD) -o krelay ./cmd/client
.PHONY: lint
lint:
golangci-lint run
.PHONY: test
test:
go test -race -v ./...
.PHONY: coverage
coverage:
go test -race -v -coverprofile=cover.out ./...
go tool cover -html cover.out
rm cover.out
.PHONY: clean
clean:
rm -rf krelay*
rm -rf kubectl-relay*
07070100000006000081A400000000000000000000000168B36E5000001D67000000000000000000000000000000000000001700000000krelay-0.1.3/README.md

[](https://goreportcard.com/report/github.com/knight42/krelay)

# krelay
`krelay` is a drop-in replacement for `kubectl port-forward` with some enhanced features.
## Table of Contents
- [Highlights](#highlights)
- [Demo](#demo)
- [Installation](#installation)
- [Usage](#usage)
- [Flags](#flags)
- [How It Works](#how-it-works)
## ✨Highlights
* Supports UDP port forwarding
* Supports simultaneous forwarding of data to multiple targets.
* Forwarding data to the given IP or hostname that is accessible within the kubernetes cluster
* You could forward a local port to a port in the `Service` or a workload like `Deployment` or `StatefulSet`, and the forwarding session will not be interfered even if you perform rolling updates.
* The hostname is resolved inside the cluster, so you don't need to change your local nameserver or modify the `/etc/hosts`.
## Demo
### Forwarding UDP port
[](https://asciinema.org/a/452745)
### Forwarding traffic to a Service
[](https://asciinema.org/a/452747)
> [!NOTE]
> The forwarding session is not affected after rolling update.
### Forwarding traffic to a IP or hostname
[](https://asciinema.org/a/452749)
### Forwarding traffic to multiple targets
```bash
$ cat > targets.txt <<EOF
# Each line in the file represents a target, the syntax is the same as the command line.
# Empty line or line starts with '#' or '//' will be ignored.
# namespace of the object can be specified by the -n flag
-n kube-system svc/kube-dns 10053:53@udp
# The default namespace is used if no namespace is specified
svc/nginx 8080:80
host/redis.cn-north-1.cache.amazonaws.com 6379
EOF
$ kubectl relay -f targets.txt
```
### Customize the forwarding server
You can provide a merge patch in JSON or YAML format to customize the forwarding server. For instance:
```bash
$ cat patch.yaml
metadata:
generateName: foo-
spec:
nodeSelector:
your-key: your-value
$ kubectl --patch-file patch.yaml svc/nginx 8080:80
```
## Installation
| Distribution | Command / Link |
|---------------------------------------|----------------------------------------------------------------|
| [Krew](https://krew.sigs.k8s.io/) | `kubectl krew install relay` |
| [Homebrew](https://brew.sh/) | `brew install knight42/tap/krelay` |
| Pre-built binaries for macOS, Linux | [GitHub releases](https://github.com/knight42/krelay/releases) |
> [!NOTE]
> If you only have limited access to the cluster, please make sure the permissions specified in [rbac.yaml](./manifests/rbac.yaml)
is granted:
```bash
wget https://raw.githubusercontent.com/knight42/krelay/main/manifests/rbac.yaml
# Edit rbac.yaml to update the user name
vim rbac.yaml
kubectl create -f rbac.yaml
```
### Build from source
```
git clone https://github.com/knight42/krelay
cd krelay
make krelay
cp krelay "$GOPATH/bin/kubectl-relay"
kubectl relay -V
```
## Usage
> [!NOTE]
> Starting from version v0.1.2, `krelay` attempts to tunnel SPDY through websocket, in line with how `kubectl port-forward` works.
>
> This behavior can be disabled by setting the environment variable `KUBECTL_PORT_FORWARD_WEBSOCKETS` to `false`.
```bash
# Listen on port 8080 locally, forwarding data to the port named "http" in the service
kubectl relay service/my-service 8080:http
# Listen on a random port locally, forwarding udp packets to port 53 in a pod selected by the deployment
kubectl relay -n kube-system deploy/kube-dns :53@udp
# Listen on port 5353 on all addresses, forwarding data to port 53 in the pod
kubectl relay --address 0.0.0.0 pod/my-pod 5353:53
# Listen on port 6379 locally, forwarding data to "redis.cn-north-1.cache.amazonaws.com:6379" from the cluster
kubectl relay host/redis.cn-north-1.cache.amazonaws.com 6379
# Listen on port 5000 and 6000 locally, forwarding data to "1.2.3.4:5000" and "1.2.3.4:6000" from the cluster
kubectl relay ip/1.2.3.4 5000@tcp 6000@udp
# Customized the server, and forward local port 5000 to "1.2.3.4:5000"
kubectl relay --patch '{"metadata":{"namespace":"kube-public"},"spec":{"nodeSelector":{"k": "v"}}}' ip/1.2.3.4 5000
```
## Flags
| flag | default | description |
|------------------|-----------------------------------------|-------------------------------------------------------------------------|
| `--address` | `127.0.0.1` | Address to listen on. Only accepts IP addresses as a value. |
| `-f`/`--file` | N/A | Forward traffic to the targets specified in the given file. |
| `--server.image` | `ghcr.io/knight42/krelay-server:v0.0.1` | The krelay-server image to use. |
| `-p`/`--patch` | N/A | The merge patch to be applied to the krelay-server pod. |
| `--patch-file` | N/A | A file containing a merge patch to be applied to the krelay-server pod. |
## How It Works
`krelay` will install an agent(named `krelay-server`) to the kubernetes cluster, and the agent will forward the traffic to the target ip/hostname.
If the target is an object in the cluster, like `Deployment`, `StatefulSet`, `krelay` will automatically select a pod it managed like `kubectl port-forward` does.
After that `krelay` will tell the destination IP(i.e. the pod's IP) and the destination port to the agent by sending a special `Header` first,
and then the data will be forwarded to the agent and sent to the target address.
Specifically, if the target is a `Service`, `krelay` will try to determine the destination address automatically:
* If the `Service` has a clusterIP, then the clusterIP is used as the destination IP.
* If the type of `Service` is `ExternalName`, then the external name is used as the destination address.
* If none of the above scenario is met, then `krelay` will choose a pod selected by this `Service`.
The `Header` looks like this:
| | Version | Header Length | Request ID | Protocol | Destination Port | Address Type | Address |
|------------|---------|---------------|------------|----------|------------------|--------------|----------|
| Byte Count | 1 | 2 | 5 | 1 | 2 | 1 | Variable |
* `Version`: This field is preserved for future extension, and it is not in-use now.
* `Header Length`: The total length of the `Header` in bytes.
* `Request ID`: The ID of the request.
* `Protocol`: The protocol of the request, `0` stands for TCP and `1` stands for UDP.
* `Destination Port`: The destination port of the request.
* `Address Type`: The type of the destination address, `0` stands for IP and `1` stands for hostname.
* `Address`: The destination address of the request:
* 4 bytes for IPv4 address
* 16 bytes for IPv6 address
* Variable bytes for hostname
07070100000007000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001100000000krelay-0.1.3/cmd07070100000008000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001800000000krelay-0.1.3/cmd/client07070100000009000081A400000000000000000000000168B36E5000000226000000000000000000000000000000000000002500000000krelay-0.1.3/cmd/client/conntrack.gopackage main
import (
"sync"
)
type connTrack struct {
mu sync.RWMutex
items map[string]chan []byte
}
func newConnTrack() connTrack {
return connTrack{items: map[string]chan []byte{}}
}
func (c *connTrack) Get(key string) (chan []byte, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
item, ok := c.items[key]
return item, ok
}
func (c *connTrack) Set(key string, item chan []byte) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = item
}
func (c *connTrack) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}
0707010000000A000081A400000000000000000000000168B36E5000000DF1000000000000000000000000000000000000002500000000krelay-0.1.3/cmd/client/forwarder.gopackage main
import (
"fmt"
"log/slog"
"net"
"strconv"
"k8s.io/apimachinery/pkg/util/httpstream"
"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)
type portForwarder struct {
addrGetter remoteaddr.Getter
ports ports.PortPair
tcpListener net.Listener
udpListener net.PacketConn
}
func newPortForwarder(addrGetter remoteaddr.Getter, pp ports.PortPair) *portForwarder {
return &portForwarder{
addrGetter: addrGetter,
ports: pp,
}
}
func (p *portForwarder) listen(localIP string) error {
bindAddr := net.JoinHostPort(localIP, strconv.Itoa(int(p.ports.LocalPort)))
switch p.ports.Protocol {
case constants.ProtocolTCP:
l, err := net.Listen(constants.ProtocolTCP, bindAddr)
if err != nil {
return err
}
p.tcpListener = l
case constants.ProtocolUDP:
pc, err := net.ListenPacket(constants.ProtocolUDP, bindAddr)
if err != nil {
return err
}
p.udpListener = pc
default:
return fmt.Errorf("unknown protocol: %s", p.ports.Protocol)
}
return nil
}
func (p *portForwarder) run(streamConn httpstream.Connection) {
switch {
case p.tcpListener != nil:
lis := p.tcpListener
defer lis.Close()
localAddr := lis.Addr().String()
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)
for {
select {
case <-streamConn.CloseChan():
return
default:
}
c, err := lis.Accept()
if err != nil {
l.Error("Fail to accept tcp connection", slogutil.Error(err))
return
}
remoteAddr, err := p.addrGetter.Get()
if err != nil {
l.Error("Fail to get remote address", slogutil.Error(err))
continue
}
go handleTCPConn(c, streamConn, remoteAddr, p.ports.RemotePort)
}
case p.udpListener != nil:
pc := p.udpListener
defer pc.Close()
udpConn := &xnet.UDPConn{UDPConn: pc.(*net.UDPConn)}
localAddr := pc.LocalAddr().String()
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)
track := newConnTrack()
finish := make(chan string)
go func() {
for key := range finish {
track.Delete(key)
l.Debug("Remove udp conn from conntrack table",
slog.String("key", key),
)
}
}()
// https://stackoverflow.com/questions/19658052/strange-behaviour-of-golang-udp-server
_ = udpConn.SetReadBuffer(1048576) // 1 MiB
buf := make([]byte, constants.UDPBufferSize)
for {
select {
case <-streamConn.CloseChan():
return
default:
}
n, cliAddr, err := udpConn.ReadFrom(buf)
if err != nil {
l.Error("Fail to read udp packet",
slogutil.Error(err),
)
return
}
data := make([]byte, n)
copy(data, buf[:n])
key := cliAddr.String()
var dataCh chan []byte
v, ok := track.Get(key)
if !ok {
dataCh = make(chan []byte)
track.Set(key, dataCh)
remoteAddr, err := p.addrGetter.Get()
if err != nil {
l.Error("Fail to get remote address",
slogutil.Error(err),
)
continue
}
go handleUDPConn(udpConn, cliAddr, dataCh, finish, streamConn, remoteAddr, p.ports.RemotePort)
} else {
dataCh = v
}
dataCh <- data
}
}
}
0707010000000B000081A400000000000000000000000168B36E500000006F000000000000000000000000000000000000002600000000krelay-0.1.3/cmd/client/goreleaser.gopackage main
// These variables are set by goreleaser
var (
version string
commit string
date string
)
0707010000000C000081A400000000000000000000000168B36E5000002764000000000000000000000000000000000000002000000000krelay-0.1.3/cmd/client/main.gopackage main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)
type Options struct {
getter genericclioptions.RESTClientGetter
// serverImage is the image to use for the krelay-server.
serverImage string
// address is the address to listen on.
address string
// targetsFile is the file containing the list of targets.
targetsFile string
// patch is the literal MergePatch
patch string
// patchFile is the file containing the MergePatch
patchFile string
verbosity int
}
// setKubernetesDefaults sets default values on the provided client config for accessing the Kubernetes API.
func setKubernetesDefaults(config *rest.Config) {
// GroupVersion is required when initializing a RESTClient
config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
if config.APIPath == "" {
config.APIPath = "/api"
}
// NegotiatedSerializer is required when initializing a RESTClient
if config.NegotiatedSerializer == nil {
// This codec factory ensures the resources are not converted. Therefore, resources
// will not be round-tripped through internal versions. Defaulting does not happen
// on the client.
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
}
}
func (o *Options) newServerPod() (*corev1.Pod, error) {
origPod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
GenerateName: constants.ServerName + "-",
Labels: map[string]string{
"app.kubernetes.io/name": constants.ServerName,
"app": constants.ServerName,
},
Annotations: map[string]string{
"cluster-autoscaler.kubernetes.io/safe-to-evict": "true",
},
},
Spec: corev1.PodSpec{
AutomountServiceAccountToken: toPtr(false),
EnableServiceLinks: toPtr(false),
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: toPtr(true),
},
Containers: []corev1.Container{
{
Name: constants.ServerName,
Image: o.serverImage,
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
ReadOnlyRootFilesystem: toPtr(true),
AllowPrivilegeEscalation: toPtr(false),
},
},
},
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "kubernetes.io/hostname",
WhenUnsatisfiable: corev1.ScheduleAnyway,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": constants.ServerName,
},
},
},
},
},
}
if len(o.patch) == 0 && len(o.patchFile) == 0 {
return &origPod, nil
}
patchBytes := []byte(o.patch)
if len(o.patchFile) > 0 {
var err error
patchBytes, err = os.ReadFile(o.patchFile)
if err != nil {
return nil, fmt.Errorf("read file: %w", err)
}
}
patched, err := patchPod(patchBytes, origPod)
if err != nil {
return nil, fmt.Errorf("patch server pod: %w", err)
}
return patched, nil
}
func (o *Options) Run(ctx context.Context, args []string) error {
ns, _, err := o.getter.ToRawKubeConfigLoader().Namespace()
if err != nil {
return fmt.Errorf("get namespace: %w", err)
}
var targets []target
if len(o.targetsFile) > 0 {
var fin *os.File
if o.targetsFile == "-" {
fin = os.Stdin
} else {
fin, err = os.Open(o.targetsFile)
if err != nil {
return err
}
defer fin.Close()
}
targets, err = parseTargetsFile(fin, ns)
if err != nil {
return err
}
} else {
if len(args) < 2 {
return errors.New("TYPE/NAME and list of ports are required for port-forward")
}
err := validateFields(args)
if err != nil {
return err
}
targets = []target{
{
resource: args[0],
ports: args[1:],
namespace: ns,
},
}
}
restCfg, err := o.getter.ToRESTConfig()
if err != nil {
return err
}
setKubernetesDefaults(restCfg)
cs, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return err
}
var portForwarders []*portForwarder
for _, targetSpec := range targets {
var addrGetter remoteaddr.Getter
parser := ports.NewParser(targetSpec.ports)
resParts := strings.Split(targetSpec.resource, "/")
switch resParts[0] {
case "ip":
remoteAddr, err := xnet.AddrFromIP(resParts[1])
if err != nil {
return err
}
addrGetter = remoteaddr.NewStaticAddr(remoteAddr)
case "host":
addrGetter = remoteaddr.NewStaticAddr(xnet.AddrFromHost(resParts[1]))
default:
obj, err := resource.NewBuilder(o.getter).
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(targetSpec.namespace).DefaultNamespace().
ResourceNames("pods", targetSpec.resource).
Do().Object()
if err != nil {
return err
}
addrGetter, err = addrGetterForObject(obj, cs, targetSpec.namespace)
if err != nil {
return err
}
parser = parser.WithObject(obj)
}
forwardPorts, err := parser.Parse()
if err != nil {
return err
}
for _, pp := range forwardPorts {
portForwarders = append(portForwarders, newPortForwarder(addrGetter, pp))
}
}
succeeded := false
for _, pf := range portForwarders {
err := pf.listen(o.address)
if err != nil {
slog.Error("Fail to listen on port", slog.Any("port", pf.ports.LocalPort), slog.Any("error", err))
} else {
succeeded = true
}
}
if !succeeded {
return fmt.Errorf("unable to listen on any of the requested ports")
}
svrPod, err := o.newServerPod()
if err != nil {
return err
}
slog.Info("Creating krelay-server", slog.String("namespace", svrPod.Namespace))
createdPod, err := cs.CoreV1().Pods(svrPod.Namespace).Create(ctx, svrPod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create krelay-server pod: %w", err)
}
svrPodName := createdPod.Name
defer removeServerPod(cs, svrPod.Namespace, svrPodName, time.Minute)
err = ensureServerPodIsRunning(ctx, cs, svrPod.Namespace, svrPodName)
if err != nil {
return fmt.Errorf("ensure krelay-server is running: %w", err)
}
slog.Info("krelay-server is running", slog.String("pod", svrPodName), slog.String("namespace", svrPod.Namespace))
restClient, err := rest.RESTClientFor(restCfg)
if err != nil {
return err
}
req := restClient.Post().
Resource("pods").
Namespace(svrPod.Namespace).Name(svrPodName).
SubResource("portforward")
dialer, err := createDialer(restCfg, req.URL())
if err != nil {
return fmt.Errorf("create dialer: %w", err)
}
streamConn, _, err := dialer.Dial(constants.PortForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer streamConn.Close()
for _, pf := range portForwarders {
go pf.run(streamConn)
}
select {
case <-streamConn.CloseChan():
slog.Info("Lost connection to krelay-server pod")
case <-ctx.Done():
}
return nil
}
func main() {
cf := genericclioptions.NewConfigFlags(true)
o := Options{
getter: cf,
}
printVersion := false
fs := flag.NewFlagSet("klog", flag.ExitOnError)
klog.InitFlags(fs)
c := cobra.Command{
Use: fmt.Sprintf(`%s TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT[@PROTOCOL] [...[LOCAL_PORT_N:]REMOTE_PORT_N[@PROTOCOL_N]]`, getProgramName()),
Example: example(),
Long: `This command is similar to "kubectl port-forward", but it also supports UDP and could forward data to a
service, ip and hostname rather than only pods.
Starting from version v0.1.2, it attempts to tunnel SPDY through websocket, in line with how "kubectl port-forward" works.
This behavior can be disabled by setting the environment variable "KUBECTL_PORT_FORWARD_WEBSOCKETS" to "false".`,
RunE: func(cmd *cobra.Command, args []string) error {
if printVersion {
return json.NewEncoder(cmd.OutOrStdout()).Encode(struct {
Version string
BuildDate string
Commit string
}{
Version: version,
BuildDate: date,
Commit: commit,
})
}
_ = fs.Set("v", strconv.Itoa(o.verbosity))
slog.SetLogLoggerLevel(slogutil.MapVerbosityToLogLevel(o.verbosity))
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
return o.Run(ctx, args)
},
SilenceUsage: true,
}
flags := c.Flags()
flags.SortFlags = false
flags.StringVar(cf.KubeConfig, "kubeconfig", *cf.KubeConfig, "Path to the kubeconfig file to use for CLI requests.")
flags.StringVarP(cf.Namespace, "namespace", "n", *cf.Namespace, "If present, the namespace scope for this CLI request")
flags.StringVar(cf.Context, "context", *cf.Context, "The name of the kubeconfig context to use")
flags.StringVar(cf.ClusterName, "cluster", *cf.ClusterName, "The name of the kubeconfig cluster to use")
flags.BoolVarP(&printVersion, "version", "V", false, "Print version info and exit.")
flags.StringVar(&o.address, "address", "127.0.0.1", "Address to listen on. Only accepts IP addresses as a value.")
flags.StringVarP(&o.targetsFile, "file", "f", "", "Forward to the targets specified in the given file, with one target per line.")
flags.IntVarP(&o.verbosity, "v", "v", 3, "Number for the log level verbosity. The bigger the more verbose.")
flags.StringVarP(&o.patch, "patch", "p", "", "The merge patch to be applied to the krelay-server pod.")
flags.StringVar(&o.patchFile, "patch-file", "", "A file containing a merge patch to be applied to the krelay-server pod.")
flags.StringVar(&o.serverImage, "server.image", "ghcr.io/knight42/krelay-server:v0.0.4", "The krelay-server image to use.")
_ = c.Execute()
}
0707010000000D000081A400000000000000000000000168B36E5000000A51000000000000000000000000000000000000001F00000000krelay-0.1.3/cmd/client/tcp.gopackage main
import (
"io"
"log/slog"
"net"
"k8s.io/apimachinery/pkg/util/httpstream"
"github.com/knight42/krelay/pkg/constants"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xio"
"github.com/knight42/krelay/pkg/xnet"
)
func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAddr xnet.Addr, dstPort uint16) {
defer clientConn.Close()
requestID := xnet.NewRequestID()
l := slog.With(slog.String(constants.LogFieldRequestID, requestID))
defer l.Debug("handleTCPConn exit")
l.Info("Handling tcp connection",
slog.String(constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort)),
slog.String(constants.LogFieldLocalAddr, clientConn.LocalAddr().String()),
slog.String("clientAddr", clientConn.RemoteAddr().String()),
)
dataStream, errorChan, err := createStream(serverConn, requestID)
if err != nil {
l.Error("Fail to create stream", slogutil.Error(err))
return
}
hdr := xnet.Header{
RequestID: requestID,
Protocol: xnet.ProtocolTCP,
Port: dstPort,
Addr: dstAddr,
}
_, err = xio.WriteFull(dataStream, hdr.Marshal())
if err != nil {
l.Error("Fail to write header", slogutil.Error(err))
return
}
var ack xnet.Acknowledgement
err = ack.FromReader(dataStream)
if err != nil {
l.Error("Fail to receive ack", slogutil.Error(err))
return
}
if ack.Code != xnet.AckCodeOK {
l.Error("Fail to connect", slogutil.Error(ack.Code))
return
}
localError := make(chan struct{})
remoteDone := make(chan struct{})
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(clientConn, dataStream); err != nil && !xnet.IsClosedConnectionError(err) {
l.Error("Fail to copy from remote stream to local connection", slogutil.Error(err))
}
// inform the select below that the remote copy is done
close(remoteDone)
}()
go func() {
// inform server we're not sending any more data after copy unblocks
defer dataStream.Close()
// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, clientConn); err != nil && !xnet.IsClosedConnectionError(err) {
l.Error("Fail to copy from local connection to remote stream", slogutil.Error(err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
}()
// wait for either a local->remote error or for copying from remote->local to finish
select {
case <-remoteDone:
case <-localError:
}
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
l.Error("Unexpected error from stream", slogutil.Error(err))
}
}
0707010000000E000081A400000000000000000000000168B36E5000000616000000000000000000000000000000000000002400000000krelay-0.1.3/cmd/client/template.gopackage main
import (
"bytes"
"os"
"path/filepath"
"strings"
"text/template"
)
func getProgramName() string {
name := filepath.Base(os.Args[0])
if strings.HasPrefix(name, "kubectl-") {
return "kubectl relay"
}
return name
}
func example() string {
const text = `
# Listen on port 8080 locally, forwarding data to the port named "http" in the service
{{.Name}} service/my-service 8080:http
# Listen on a random port locally, forwarding udp packets to port 53 in a pod selected by the deployment
{{.Name}} -n kube-system deploy/kube-dns :53@udp
# Listen on port 5353 on all addresses, forwarding data to port 53 in the pod
{{.Name}} --address 0.0.0.0 pod/my-pod 5353:53
# Listen on port 6379 locally, forwarding data to "redis.cn-north-1.cache.amazonaws.com:6379" from the cluster
{{.Name}} host/redis.cn-north-1.cache.amazonaws.com 6379
# Listen on port 5000 and 6000 locally, forwarding data to "1.2.3.4:5000" and "1.2.3.4:6000" from the cluster
{{.Name}} ip/1.2.3.4 5000@tcp 6000@udp
# Customize the server, and forward local port 5000 to "1.2.3.4:5000"
{{.Name}} --patch '{"metadata":{"namespace":"kube-public"},"spec":{"nodeSelector":{"k": "v"}}}' ip/1.2.3.4 5000
# Forward traffic to multiple targets
echo 'ip/1.2.3.4 5000\nsvc/my-service 8080:80\n-n kube-system deploy/coredns 5353:53@udp' | {{.Name}} -f -
`
tpl, err := template.New("example").Parse(text)
if err != nil {
panic(err)
}
var b bytes.Buffer
_ = tpl.Execute(&b, map[string]string{
"Name": getProgramName(),
})
return b.String()
}
0707010000000F000081A400000000000000000000000168B36E5000000908000000000000000000000000000000000000001F00000000krelay-0.1.3/cmd/client/udp.gopackage main
import (
"log/slog"
"net"
"k8s.io/apimachinery/pkg/util/httpstream"
"github.com/knight42/krelay/pkg/constants"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xio"
"github.com/knight42/krelay/pkg/xnet"
)
func handleUDPConn(clientConn net.PacketConn, cliAddr net.Addr, dataCh chan []byte, finish chan<- string, serverConn httpstream.Connection, dstAddr xnet.Addr, dstPort uint16) {
requestID := xnet.NewRequestID()
l := slog.With(slog.String(constants.LogFieldRequestID, requestID))
defer l.Debug("handleUDPConn exit")
defer func() {
finish <- cliAddr.String()
}()
l.Info("Handling udp connection",
slog.String(constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort)),
slog.String(constants.LogFieldLocalAddr, clientConn.LocalAddr().String()),
slog.String("clientAddr", cliAddr.String()),
)
dataStream, errorChan, err := createStream(serverConn, requestID)
if err != nil {
l.Error("Fail to create stream", slogutil.Error(err))
return
}
hdr := xnet.Header{
RequestID: requestID,
Protocol: xnet.ProtocolUDP,
Port: dstPort,
Addr: dstAddr,
}
_, err = xio.WriteFull(dataStream, hdr.Marshal())
if err != nil {
l.Error("Fail to write header", slogutil.Error(err))
return
}
var ack xnet.Acknowledgement
err = ack.FromReader(dataStream)
if err != nil {
l.Error("Fail to receive ack", slogutil.Error(err))
return
}
if ack.Code != xnet.AckCodeOK {
l.Error("Fail to connect", slogutil.Error(ack.Code))
return
}
upClosed := make(chan struct{})
go func() {
var (
data []byte
ok bool
)
for {
select {
case data, ok = <-dataCh:
if !ok {
return
}
case <-upClosed:
return
}
_, err = xio.WriteFull(dataStream, data)
if err != nil {
return
}
}
}()
go func() {
defer l.Debug("Server close connection")
defer close(upClosed)
buf := make([]byte, constants.UDPBufferSize)
for {
n, err := xnet.ReadUDPFromStream(dataStream, buf, 0)
if err != nil {
return
}
_, err = clientConn.WriteTo(buf[:n], cliAddr)
if err != nil {
return
}
}
}()
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
l.Error("Unexpected error from stream", slogutil.Error(err))
}
}
07070100000010000081A400000000000000000000000168B36E50000021B9000000000000000000000000000000000000002100000000krelay-0.1.3/cmd/client/utils.gopackage main
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
jsonpatch "github.com/evanphx/json-patch/v5"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)
func toPtr[T any](v T) *T {
return &v
}
func patchPod(patchBytes []byte, origPod corev1.Pod) (*corev1.Pod, error) {
patchJSONBytes, err := yaml.ToJSON(patchBytes)
if err != nil {
return nil, fmt.Errorf("convert patch to json: %w", err)
}
origBytes, err := json.Marshal(origPod)
if err != nil {
return nil, fmt.Errorf("marshal pod: %w", err)
}
after, err := jsonpatch.MergePatch(origBytes, patchJSONBytes)
if err != nil {
return nil, fmt.Errorf("apply merge patch: %w", err)
}
var patchedPod corev1.Pod
err = json.Unmarshal(after, &patchedPod)
if err != nil {
return nil, fmt.Errorf("unmarshal pod: %w", err)
}
return &patchedPod, nil
}
func ensureServerPodIsRunning(ctx context.Context, cs kubernetes.Interface, namespace, podName string) error {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()
w, err := cs.CoreV1().Pods(namespace).Watch(timeoutCtx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", podName),
})
if err != nil {
return fmt.Errorf("watch krelay-server pod: %w", err)
}
defer w.Stop()
running := false
loop:
for ev := range w.ResultChan() {
switch ev.Type {
case watch.Deleted, watch.Error:
break loop
case watch.Modified, watch.Added:
default:
continue
}
podObj := ev.Object.(*corev1.Pod)
for _, status := range podObj.Status.ContainerStatuses {
// there is only one container in the pod
if status.State.Running != nil {
running = true
break loop
}
slog.Debug("Pod is not running. Will retry.", slog.String("pod", podObj.Name))
}
}
if !running {
return fmt.Errorf("krelay-server pod is not running")
}
return nil
}
func removeServerPod(cs kubernetes.Interface, namespace, podName string, timeout time.Duration) {
l := slog.With(slog.String("pod", podName))
l.Info("Removing krelay-server pod")
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := cs.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{
GracePeriodSeconds: toPtr[int64](0),
})
if err != nil && !k8serr.IsNotFound(err) {
l.Error("Fail to remove krelay-server pod", slogutil.Error(err))
}
}
func addrGetterForObject(obj runtime.Object, cs kubernetes.Interface, ns string) (remoteaddr.Getter, error) {
switch actual := obj.(type) {
case *corev1.Pod:
addr, err := xnet.AddrFromIP(actual.Status.PodIP)
if err != nil {
return nil, err
}
return remoteaddr.NewStaticAddr(addr), nil
case *corev1.Service:
if actual.Spec.Type == corev1.ServiceTypeExternalName {
addr := xnet.AddrFromHost(actual.Spec.ExternalName)
return remoteaddr.NewStaticAddr(addr), nil
}
if actual.Spec.ClusterIP != corev1.ClusterIPNone {
addr, err := xnet.AddrFromIP(actual.Spec.ClusterIP)
if err != nil {
return nil, err
}
return remoteaddr.NewStaticAddr(addr), nil
}
if len(actual.Spec.Selector) == 0 {
return nil, fmt.Errorf("service selector is empty")
}
selector := labels.SelectorFromSet(actual.Spec.Selector)
return remoteaddr.NewDynamicAddr(cs, ns, selector.String())
case *appsv1.ReplicaSet:
selector, err := metav1.LabelSelectorAsSelector(actual.Spec.Selector)
if err != nil {
return nil, err
}
return remoteaddr.NewDynamicAddr(cs, ns, selector.String())
case *appsv1.Deployment:
selector, err := metav1.LabelSelectorAsSelector(actual.Spec.Selector)
if err != nil {
return nil, err
}
return remoteaddr.NewDynamicAddr(cs, ns, selector.String())
case *appsv1.StatefulSet:
selector, err := metav1.LabelSelectorAsSelector(actual.Spec.Selector)
if err != nil {
return nil, err
}
return remoteaddr.NewDynamicAddr(cs, ns, selector.String())
case *appsv1.DaemonSet:
selector, err := metav1.LabelSelectorAsSelector(actual.Spec.Selector)
if err != nil {
return nil, err
}
return remoteaddr.NewDynamicAddr(cs, ns, selector.String())
}
return nil, fmt.Errorf("unknown object: %T", obj)
}
func createStream(c httpstream.Connection, reqID string) (dataStream httpstream.Stream, errCh chan error, err error) {
// create error stream
headers := http.Header{}
headers.Set(corev1.StreamType, corev1.StreamTypeError)
headers.Set(corev1.PortHeader, strconv.Itoa(constants.ServerPort))
headers.Set(corev1.PortForwardRequestIDHeader, reqID)
errStream, err := c.CreateStream(headers)
if err != nil {
return nil, nil, errors.New("create error stream")
}
// we're not writing to this stream
_ = errStream.Close()
// create data stream
headers.Set(corev1.StreamType, corev1.StreamTypeData)
dataStream, err = c.CreateStream(headers)
if err != nil {
return nil, nil, fmt.Errorf("create data stream: %w", err)
}
errCh = make(chan error)
go func() {
message, err := io.ReadAll(errStream)
errMsg := string(message)
switch {
case err != nil:
errMsg = err.Error()
errCh <- fmt.Errorf("error reading from error stream: %w", err)
case len(message) > 0:
errCh <- fmt.Errorf("an error occurred forwarding: %v", errMsg)
}
close(errCh)
// check if the spdy connection is corrupted
if ok, _ := filepath.Match("*network namespace for sandbox * is closed", errMsg); ok {
_ = c.Close()
}
}()
return dataStream, errCh, nil
}
func validateFields(fields []string) error {
if len(fields) < 2 {
return fmt.Errorf("invalid syntax")
}
resourceParts := strings.Split(fields[0], "/")
if len(resourceParts) > 2 {
return fmt.Errorf("unknown resource: %q", fields[0])
}
if resourceParts[0] == "ip" {
isInvalid := net.ParseIP(resourceParts[1]) == nil
if isInvalid {
return fmt.Errorf("invalid IP address: %q", resourceParts[1])
}
}
return nil
}
type target struct {
resource string
ports []string
namespace string
}
func parseTargetsFile(r io.Reader, defaultNamespace string) ([]target, error) {
fs := flag.NewFlagSet("targets", flag.ContinueOnError)
fs.SetOutput(io.Discard)
var ns string
fs.StringVar(&ns, "n", defaultNamespace, "namespace")
s := bufio.NewScanner(r)
var ret []target
lineNo := 0
for s.Scan() {
lineNo++
line := strings.TrimSpace(s.Text())
if len(line) == 0 || strings.HasPrefix(line, "//") || strings.HasPrefix(line, "#") {
continue
}
fields := strings.Fields(line)
// We need to reset the state before parsing the next line
ns = defaultNamespace
err := fs.Parse(fields)
if err != nil {
return nil, fmt.Errorf("line: %d: %w", lineNo, err)
}
remain := fs.Args()
err = validateFields(remain)
if err != nil {
return nil, fmt.Errorf("line %d: %w", lineNo, err)
}
ret = append(ret, target{
resource: remain[0],
ports: remain[1:],
namespace: ns,
})
}
return ret, nil
}
func createDialer(restCfg *rest.Config, dstURL *url.URL) (httpstream.Dialer, error) {
// Excerpt from https://github.com/kubernetes/kubernetes/blob/f5c538418189e119a8dbb60e2a2b22394548e326/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go#L139
transport, upgrader, err := spdy.RoundTripperFor(restCfg)
if err != nil {
return nil, err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, dstURL)
if strings.ToLower(os.Getenv("KUBECTL_PORT_FORWARD_WEBSOCKETS")) != "false" {
slog.Debug("Trying to forward ports using websocket")
tunnelDialer, err := portforward.NewSPDYOverWebsocketDialer(dstURL, restCfg)
if err != nil {
return nil, fmt.Errorf("create tunneling dialer: %w", err)
}
dialer = portforward.NewFallbackDialer(tunnelDialer, dialer, func(err error) bool {
shouldFallback := httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
if shouldFallback {
slog.Debug("Websocket upgrade failed, falling back to SPDY")
}
return shouldFallback
})
}
return dialer, nil
}
07070100000011000081A400000000000000000000000168B36E5000000C8E000000000000000000000000000000000000002600000000krelay-0.1.3/cmd/client/utils_test.gopackage main
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestParseTargetsFile(t *testing.T) {
testCases := map[string]struct {
input string
defaultNamespace string
expect []target
expectErr string
}{
// valid cases
"normal": {
input: `bar 53@udp 53@tcp`,
expect: []target{
{
resource: "bar",
ports: []string{"53@udp", "53@tcp"},
},
},
},
"comment & empty line": {
input: `
# comment
ip/1.2.3.4 8080
// comment
host/google.com 443@tcp
pod/foo 8000 8001
`,
expect: []target{
{
resource: "ip/1.2.3.4",
ports: []string{"8080"},
},
{
resource: "host/google.com",
ports: []string{"443@tcp"},
},
{
resource: "pod/foo",
ports: []string{"8000", "8001"},
},
},
},
"different namespaces": {
defaultNamespace: "foo",
input: `
-n bar1 svc/q 8000
host/q.com 8000 9000:9001
-n=bar2 svc/q 8000
svc/q 8000
`,
expect: []target{
{
resource: "svc/q",
ports: []string{"8000"},
namespace: "bar1",
},
{
resource: "host/q.com",
ports: []string{"8000", "9000:9001"},
namespace: "foo",
},
{
resource: "svc/q",
ports: []string{"8000"},
namespace: "bar2",
},
{
resource: "svc/q",
ports: []string{"8000"},
namespace: "foo",
},
},
},
// invalid cases
"invalid ip": {
input: `ip/1.2.3 8080`,
expectErr: "invalid IP address",
},
"unknown flag": {
input: `-invalid-flag foo 8080`,
expectErr: "flag provided but not defined: -invalid-flag",
},
"missing value for -n flag": {
input: `-n foo 8080`,
expectErr: "invalid syntax",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
got, err := parseTargetsFile(strings.NewReader(tc.input), tc.defaultNamespace)
if len(tc.expectErr) == 0 {
require.NoError(t, err)
require.Equal(t, tc.expect, got)
return
}
require.Error(t, err)
t.Logf("error: %v", err)
require.ErrorContains(t, err, tc.expectErr)
})
}
}
func TestPatchPod(t *testing.T) {
testCases := map[string]struct {
patch string
origPod corev1.Pod
expected corev1.Pod
}{
"patch in json": {
patch: `{"metadata": {"name": "foo"}}`,
origPod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
Namespace: metav1.NamespaceDefault,
},
},
expected: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
},
},
"patch in yaml": {
patch: `
metadata:
name: foo`,
origPod: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "bar",
Namespace: metav1.NamespaceDefault,
},
},
expected: corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: metav1.NamespaceDefault,
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
got, err := patchPod([]byte(tc.patch), tc.origPod)
require.NoError(t, err)
require.Equal(t, tc.expected, *got)
})
}
}
07070100000012000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001800000000krelay-0.1.3/cmd/server07070100000013000081A400000000000000000000000168B36E5000000F37000000000000000000000000000000000000002000000000krelay-0.1.3/cmd/server/main.gopackage main
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"time"
"github.com/spf13/cobra"
"github.com/knight42/krelay/pkg/constants"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)
type options struct {
connectTimeout time.Duration
}
func (o *options) run(ctx context.Context) error {
tcpListener, err := net.Listen(constants.ProtocolTCP, fmt.Sprintf("0.0.0.0:%d", constants.ServerPort))
if err != nil {
return err
}
defer tcpListener.Close()
dialer := net.Dialer{Timeout: o.connectTimeout}
slog.Info("Accepting connections")
for {
c, err := tcpListener.Accept()
if err != nil {
var tmpErr interface {
Temporary() bool
}
if errors.As(err, &tmpErr) && tmpErr.Temporary() {
continue
}
slog.Error("Fail to accept connection", slogutil.Error(err))
return err
}
go handleConn(ctx, c.(*net.TCPConn), &dialer)
}
}
func writeACK(c net.Conn, ack xnet.Acknowledgement) error {
data := ack.Marshal()
_, err := c.Write(data)
return err
}
func ackCodeFromErr(err error) xnet.AckCode {
var dnsErr *net.DNSError
if errors.As(err, &dnsErr) {
if dnsErr.IsNotFound {
return xnet.AckCodeNoSuchHost
}
if dnsErr.IsTimeout {
return xnet.AckCodeResolveTimeout
}
}
var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Timeout() {
return xnet.AckCodeConnectTimeout
}
return xnet.AckCodeUnknownError
}
func handleConn(ctx context.Context, c *net.TCPConn, dialer *net.Dialer) {
defer c.Close()
hdr := xnet.Header{}
err := hdr.FromReader(c)
if err != nil {
slog.Error("Fail to read header", slogutil.Error(err))
return
}
dstAddr := xnet.JoinHostPort(hdr.Addr.String(), hdr.Port)
l := slog.With(slog.String(constants.LogFieldRequestID, hdr.RequestID))
switch hdr.Protocol {
case xnet.ProtocolTCP:
upstreamConn, err := dialer.DialContext(ctx, constants.ProtocolTCP, dstAddr)
if err != nil {
l.Error("Fail to create tcp connection", slog.String(constants.LogFieldDestAddr, dstAddr), slogutil.Error(err))
_ = writeACK(c, xnet.Acknowledgement{
Code: ackCodeFromErr(err),
})
return
}
err = writeACK(c, xnet.Acknowledgement{
Code: xnet.AckCodeOK,
})
if err != nil {
l.Error("Fail to write ack", slogutil.Error(err))
return
}
l.Info("Start proxy tcp request", slog.String(constants.LogFieldDestAddr, dstAddr))
xnet.ProxyTCP(hdr.RequestID, c, upstreamConn.(*net.TCPConn))
case xnet.ProtocolUDP:
upstreamConn, err := dialer.DialContext(ctx, constants.ProtocolUDP, dstAddr)
if err != nil {
l.Error("Fail to create udp connection", slog.String(constants.LogFieldDestAddr, dstAddr), slogutil.Error(err))
_ = writeACK(c, xnet.Acknowledgement{
Code: ackCodeFromErr(err),
})
return
}
err = writeACK(c, xnet.Acknowledgement{
Code: xnet.AckCodeOK,
})
if err != nil {
l.Error("Fail to write ack", slogutil.Error(err))
return
}
l.Info("Start proxy udp request", slog.String(constants.LogFieldDestAddr, dstAddr))
udpConn := &xnet.UDPConn{UDPConn: upstreamConn.(*net.UDPConn)}
xnet.ProxyUDP(hdr.RequestID, c, udpConn)
default:
l.Error("Unknown protocol", slog.String(constants.LogFieldDestAddr, dstAddr), slog.Any(constants.LogFieldProtocol, hdr.Protocol))
err = writeACK(c, xnet.Acknowledgement{
Code: xnet.AckCodeUnknownProtocol,
})
if err != nil {
l.Error("Fail to write ack", slogutil.Error(err))
return
}
}
}
func main() {
o := options{}
c := cobra.Command{
Use: constants.ServerName,
RunE: func(_ *cobra.Command, _ []string) (err error) {
return o.run(context.TODO())
},
SilenceUsage: true,
}
flags := c.Flags()
flags.DurationVar(&o.connectTimeout, "connect-timeout", time.Second*10, "Timeout for connecting to upstream")
flags.IntP("v", "v", 0, "bogus flag to keep backward compatibility. This flag will be removed in the future.")
_ = c.Execute()
}
07070100000014000081A400000000000000000000000168B36E50000006A6000000000000000000000000000000000000002400000000krelay-0.1.3/cmd/server/tcp_test.gopackage main
import (
"context"
"fmt"
"io"
"net"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/knight42/krelay/pkg/testutils/tcp"
"github.com/knight42/krelay/pkg/xio"
"github.com/knight42/krelay/pkg/xnet"
)
func TestHandleTCPConn(t *testing.T) {
const msg = "Hello, World!"
ts, tsURL, tsPort := tcp.NewTLSServer(t, func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(msg))
})
defer ts.Close()
dialer := net.Dialer{Timeout: time.Second * 10}
ctx := context.Background()
l := tcp.NewTCPServer(t, func(c net.Conn) {
handleConn(ctx, c.(*net.TCPConn), &dialer)
})
defer l.Close()
client := ts.Client()
transport := client.Transport.(*http.Transport)
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
c, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
hdr := xnet.Header{
RequestID: xnet.NewRequestID(),
Protocol: xnet.ProtocolTCP,
Port: tsPort,
Addr: xnet.AddrFromHost(tsURL.Hostname()),
}
_, err = xio.WriteFull(c, hdr.Marshal())
if err != nil {
return nil, fmt.Errorf("write header: %w", err)
}
var ack xnet.Acknowledgement
err = ack.FromReader(c)
if err != nil {
return nil, fmt.Errorf("read ack: %w", err)
}
if ack.Code != xnet.AckCodeOK {
return nil, fmt.Errorf("ack: %s", ack.Code.Error())
}
return c, nil
}
r := require.New(t)
resp, err := ts.Client().Get("https://" + l.Addr().String())
r.NoError(err)
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
r.NoError(err)
t.Logf("Got body: %s", string(body))
r.Equal(msg, string(body))
}
07070100000015000081A400000000000000000000000168B36E5000000CC5000000000000000000000000000000000000001400000000krelay-0.1.3/go.modmodule github.com/knight42/krelay
go 1.24.0
toolchain go1.24.2
require (
github.com/evanphx/json-patch/v5 v5.9.11
github.com/spf13/cobra v1.9.1
github.com/stretchr/testify v1.10.0
k8s.io/api v0.33.3
k8s.io/apimachinery v0.33.3
k8s.io/cli-runtime v0.33.3
k8s.io/client-go v0.33.3
k8s.io/klog/v2 v2.130.1
)
require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/spdystream v0.5.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sync v0.12.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/kustomize/api v0.19.0 // indirect
sigs.k8s.io/kustomize/kyaml v0.19.0 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
07070100000016000081A400000000000000000000000168B36E5000004B34000000000000000000000000000000000000001400000000krelay-0.1.3/go.sumgithub.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE=
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo=
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU=
github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 h1:n6/2gBQ3RWajuToeY6ZtZTIKv2v7ThUy5KKusIT0yc0=
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod h1:Pm3mSP3c5uWn86xMLZ5Sa7JB9GsEZySvHYXCTK4E9q4=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM=
github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4=
github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ=
github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ=
golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4=
gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.33.3 h1:SRd5t//hhkI1buzxb288fy2xvjubstenEKL9K51KBI8=
k8s.io/api v0.33.3/go.mod h1:01Y/iLUjNBM3TAvypct7DIj0M0NIZc+PzAHCIo0CYGE=
k8s.io/apimachinery v0.33.3 h1:4ZSrmNa0c/ZpZJhAgRdcsFcZOw1PQU1bALVQ0B3I5LA=
k8s.io/apimachinery v0.33.3/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
k8s.io/cli-runtime v0.33.3 h1:Dgy4vPjNIu8LMJBSvs8W0LcdV0PX/8aGG1DA1W8lklA=
k8s.io/cli-runtime v0.33.3/go.mod h1:yklhLklD4vLS8HNGgC9wGiuHWze4g7x6XQZ+8edsKEo=
k8s.io/client-go v0.33.3 h1:M5AfDnKfYmVJif92ngN532gFqakcGi6RvaOF16efrpA=
k8s.io/client-go v0.33.3/go.mod h1:luqKBQggEf3shbxHY4uVENAxrDISLOarxpTKMiUuujg=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4=
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8=
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro=
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8=
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo=
sigs.k8s.io/kustomize/api v0.19.0 h1:F+2HB2mU1MSiR9Hp1NEgoU2q9ItNOaBJl0I4Dlus5SQ=
sigs.k8s.io/kustomize/api v0.19.0/go.mod h1:/BbwnivGVcBh1r+8m3tH1VNxJmHSk1PzP5fkP6lbL1o=
sigs.k8s.io/kustomize/kyaml v0.19.0 h1:RFge5qsO1uHhwJsu3ipV7RNolC7Uozc0jUBC/61XSlA=
sigs.k8s.io/kustomize/kyaml v0.19.0/go.mod h1:FeKD5jEOH+FbZPpqUghBP8mrLjJ3+zD3/rf9NNu1cwY=
sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=
sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY=
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc=
sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
07070100000017000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001700000000krelay-0.1.3/manifests07070100000018000081A400000000000000000000000168B36E500000019B000000000000000000000000000000000000002900000000krelay-0.1.3/manifests/Dockerfile-serverFROM golang:1.22.5 AS builder
WORKDIR /workspace
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
--mount=target=. \
CGO_ENABLED=0 go build -trimpath -ldflags '-w -s' -o /tmp/server ./cmd/server
FROM gcr.io/distroless/static:nonroot
LABEL maintainer="Jian Zeng <anonymousknight96@gmail.com>"
COPY --from=builder /tmp/server /server
ENTRYPOINT ["/server"]
07070100000019000081A400000000000000000000000168B36E5000000360000000000000000000000000000000000000002100000000krelay-0.1.3/manifests/rbac.yamlapiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: krelay
rules:
- apiGroups:
- ""
resources:
- pods
- pods/portforward
verbs:
# create the krelay-server pod and forward local port to it
- create
# watch the krelay-server pod
- watch
# clean the krelay-server pod
- delete
# The following permissions is only requried if you want to forward the local port to the respective objects.
- apiGroups:
- ""
resources:
- services
- pods
verbs:
- get
- apiGroups:
- apps
resources:
- replicasets
- deployments
- statefulsets
- daemonsets
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: krelay
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: krelay
subjects:
- kind: User
name: <Please fill in your username here>
0707010000001A000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001100000000krelay-0.1.3/pkg0707010000001B000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001700000000krelay-0.1.3/pkg/alarm0707010000001C000081A400000000000000000000000168B36E50000002C4000000000000000000000000000000000000002000000000krelay-0.1.3/pkg/alarm/alarm.gopackage alarm
import (
"time"
)
type Alarm struct {
d time.Duration
reset chan struct{}
done chan struct{}
}
func New(d time.Duration) Alarm {
return Alarm{
d: d,
reset: make(chan struct{}),
done: make(chan struct{}),
}
}
func stopTimer(t *time.Timer) {
if !t.Stop() {
<-t.C
}
}
func (a *Alarm) Start() {
go func() {
t := time.NewTimer(a.d)
for {
select {
case <-t.C:
close(a.done)
stopTimer(t)
return
case <-a.reset:
stopTimer(t)
t.Reset(a.d)
}
}
}()
}
func (a *Alarm) Reset() {
select {
case a.reset <- struct{}{}:
case <-a.done:
}
}
func (a *Alarm) Done() bool {
select {
case <-a.done:
return true
default:
return false
}
}
0707010000001D000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001B00000000krelay-0.1.3/pkg/constants0707010000001E000081A400000000000000000000000168B36E50000001A7000000000000000000000000000000000000002800000000krelay-0.1.3/pkg/constants/constants.gopackage constants
const (
LogFieldRequestID = "reqID"
LogFieldDestAddr = "dstAddr"
LogFieldLocalAddr = "localAddr"
LogFieldRemotePort = "remotePort"
LogFieldProtocol = "protocol"
)
const (
ServerName = "krelay-server"
ServerPort = 9527
)
const (
UDPBufferSize = 65536 + 2
TCPBufferSize = 32768
)
const PortForwardProtocolV1Name = "portforward.k8s.io"
const (
ProtocolTCP = "tcp"
ProtocolUDP = "udp"
)
0707010000001F000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001700000000krelay-0.1.3/pkg/ports07070100000020000081A400000000000000000000000168B36E50000000A1000000000000000000000000000000000000001F00000000krelay-0.1.3/pkg/ports/pair.gopackage ports
// PortPair consists of localPort, remotePort and the protocol.
type PortPair struct {
LocalPort uint16
RemotePort uint16
Protocol string
}
07070100000021000081A400000000000000000000000168B36E5000001163000000000000000000000000000000000000002100000000krelay-0.1.3/pkg/ports/parser.gopackage ports
import (
"fmt"
"strconv"
"strings"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/knight42/krelay/pkg/constants"
)
type Parser struct {
args []string
obj runtime.Object
}
func (p Parser) WithObject(obj runtime.Object) Parser {
p.obj = obj
return p
}
func (p *Parser) Parse() ([]PortPair, error) {
var (
allPorts portsInObject
err error
)
if p.obj != nil {
allPorts, err = getPortsFromObject(p.obj)
if err != nil {
return nil, err
}
}
ret := make([]PortPair, 0, len(p.args))
for _, arg := range p.args {
var (
proto string
portPair PortPair
)
protoIdx := strings.IndexRune(arg, '@')
if protoIdx > 0 {
if protoIdx < len(arg)-1 {
proto = arg[protoIdx+1:]
switch proto {
case constants.ProtocolTCP, constants.ProtocolUDP:
default:
return nil, fmt.Errorf("unknown protocol: %q", proto)
}
}
arg = arg[:protoIdx]
}
portPair.Protocol = proto
enforceProtocol := len(proto) > 0
var (
localStr, remoteStr string
)
parts := strings.Split(arg, ":")
switch len(parts) {
case 1:
remoteStr = parts[0]
case 2:
localStr, remoteStr = parts[0], parts[1]
if len(localStr) == 0 {
localStr = "0"
}
default:
return nil, fmt.Errorf("invalid port format: %q", arg)
}
// determine the remote port and protocol
remotePort, err := parsePort(remoteStr)
if err != nil {
if p.obj == nil {
return nil, err
}
// assume it's a name of port
port, ok := allPorts.Names[remoteStr]
if !ok {
return nil, fmt.Errorf("port name not found: %q", remoteStr)
}
portPair.RemotePort = port.Port
if !enforceProtocol {
portPair.Protocol = port.Protocol
}
} else {
portPair.RemotePort = remotePort
if !enforceProtocol && p.obj != nil {
if protos, ok := allPorts.Protocols[remotePort]; ok {
if len(protos) > 1 {
return nil, fmt.Errorf("ambiguous protocol of port: %d: %v", remotePort, protos)
}
portPair.Protocol = protos[0]
}
}
}
if len(portPair.Protocol) == 0 {
// fallback to TCP
portPair.Protocol = constants.ProtocolTCP
}
// determine the local port
if len(localStr) == 0 {
portPair.LocalPort = portPair.RemotePort
} else {
portPair.LocalPort, err = parsePort(localStr)
if err != nil {
return nil, err
}
}
ret = append(ret, portPair)
}
return ret, nil
}
// NewParser creates a new parser that parse ports in args.
func NewParser(args []string) Parser {
return Parser{args: args}
}
func parsePort(s string) (uint16, error) {
port, err := strconv.ParseUint(s, 10, 16)
if err != nil {
return 0, fmt.Errorf("invalid port: %q", s)
}
return uint16(port), nil
}
type portsInObject struct {
Names map[string]portWithProtocol
Protocols map[uint16][]string
}
type portWithProtocol struct {
Port uint16
Protocol string
}
func getPortsFromObject(obj runtime.Object) (portsInObject, error) {
switch actual := obj.(type) {
case *corev1.Service:
ret := portsInObject{
Names: map[string]portWithProtocol{},
Protocols: map[uint16][]string{},
}
for _, port := range actual.Spec.Ports {
po := uint16(port.Port)
proto := strings.ToLower(string(port.Protocol))
ret.Names[port.Name] = portWithProtocol{
Port: po,
Protocol: proto,
}
ret.Protocols[po] = append(ret.Protocols[po], proto)
}
return ret, nil
case *corev1.Pod:
return getPortsFromPodSpec(&actual.Spec), nil
case *appsv1.Deployment:
return getPortsFromPodSpec(&actual.Spec.Template.Spec), nil
case *appsv1.StatefulSet:
return getPortsFromPodSpec(&actual.Spec.Template.Spec), nil
case *appsv1.ReplicaSet:
return getPortsFromPodSpec(&actual.Spec.Template.Spec), nil
case *appsv1.DaemonSet:
return getPortsFromPodSpec(&actual.Spec.Template.Spec), nil
default:
return portsInObject{}, fmt.Errorf("unknown object: %T", obj)
}
}
func getPortsFromPodSpec(podSpec *corev1.PodSpec) portsInObject {
ret := portsInObject{
Names: map[string]portWithProtocol{},
Protocols: map[uint16][]string{},
}
for _, ct := range podSpec.Containers {
for _, ctPort := range ct.Ports {
po := uint16(ctPort.ContainerPort)
proto := strings.ToLower(string(ctPort.Protocol))
ret.Names[ctPort.Name] = portWithProtocol{
Port: po,
Protocol: proto,
}
ret.Protocols[po] = append(ret.Protocols[po], proto)
}
}
return ret
}
07070100000022000081A400000000000000000000000168B36E500000215E000000000000000000000000000000000000002600000000krelay-0.1.3/pkg/ports/parser_test.gopackage ports
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/knight42/krelay/pkg/constants"
)
func TestGetPortsFromPodSpec(t *testing.T) {
testCases := map[string]struct {
podSpec corev1.PodSpec
expected portsInObject
}{
"no ports": {
podSpec: corev1.PodSpec{
InitContainers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "test",
ContainerPort: 123,
},
},
},
},
},
expected: portsInObject{
Names: map[string]portWithProtocol{},
Protocols: map[uint16][]string{},
},
},
"same port with different protocols": {
podSpec: corev1.PodSpec{
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "tcp-dns",
ContainerPort: 53,
Protocol: corev1.ProtocolTCP,
},
{
Name: "test",
ContainerPort: 80,
Protocol: corev1.ProtocolTCP,
},
},
},
{
Ports: []corev1.ContainerPort{
{
Name: "udp-dns",
ContainerPort: 53,
Protocol: corev1.ProtocolUDP,
},
},
},
},
},
expected: portsInObject{
Names: map[string]portWithProtocol{
"tcp-dns": {
Port: 53,
Protocol: constants.ProtocolTCP,
},
"udp-dns": {
Port: 53,
Protocol: constants.ProtocolUDP,
},
"test": {
Port: 80,
Protocol: constants.ProtocolTCP,
},
},
Protocols: map[uint16][]string{
53: {constants.ProtocolTCP, constants.ProtocolUDP},
80: {constants.ProtocolTCP},
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
ports := getPortsFromPodSpec(&tc.podSpec)
r.Equal(tc.expected, ports)
})
}
}
func TestGetPortsFromObject(t *testing.T) {
testCases := map[string]struct {
obj runtime.Object
expected portsInObject
}{
"service": {
obj: &corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "tcp-dns",
Port: 53,
Protocol: corev1.ProtocolTCP,
},
{
Name: "udp-dns",
Port: 53,
Protocol: corev1.ProtocolUDP,
},
},
},
},
expected: portsInObject{
Names: map[string]portWithProtocol{
"tcp-dns": {
Port: 53,
Protocol: constants.ProtocolTCP,
},
"udp-dns": {
Port: 53,
Protocol: constants.ProtocolUDP,
},
},
Protocols: map[uint16][]string{
53: {constants.ProtocolTCP, constants.ProtocolUDP},
},
},
},
"deployment": {
obj: &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "tcp-dns",
ContainerPort: 53,
Protocol: corev1.ProtocolTCP,
},
{
Name: "udp-dns",
ContainerPort: 53,
Protocol: corev1.ProtocolUDP,
},
},
},
{
Ports: []corev1.ContainerPort{
{
Name: "test",
ContainerPort: 8080,
Protocol: corev1.ProtocolTCP,
},
},
},
},
},
},
},
},
expected: portsInObject{
Names: map[string]portWithProtocol{
"tcp-dns": {
Port: 53,
Protocol: constants.ProtocolTCP,
},
"udp-dns": {
Port: 53,
Protocol: constants.ProtocolUDP,
},
"test": {
Port: 8080,
Protocol: constants.ProtocolTCP,
},
},
Protocols: map[uint16][]string{
53: {constants.ProtocolTCP, constants.ProtocolUDP},
8080: {constants.ProtocolTCP},
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
got, err := getPortsFromObject(tc.obj)
r.NoError(err)
r.Equal(tc.expected, got)
})
}
}
func TestParsePorts(t *testing.T) {
testCases := map[string]struct {
obj runtime.Object
args []string
expected []PortPair
expectedErr error
}{
"simple": {
args: []string{"5353@udp", ":8080", "8443:443@tcp"},
expected: []PortPair{
{
LocalPort: 5353,
RemotePort: 5353,
Protocol: constants.ProtocolUDP,
},
{
LocalPort: 0,
RemotePort: 8080,
Protocol: constants.ProtocolTCP,
},
{
LocalPort: 8443,
RemotePort: 443,
Protocol: constants.ProtocolTCP,
},
},
},
"port name as remote port": {
args: []string{"udp-dns", ":tcp-dns", "5353:udp-dns"},
obj: &corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "udp-dns",
Port: 53,
Protocol: corev1.ProtocolUDP,
},
{
Name: "tcp-dns",
Port: 53,
Protocol: corev1.ProtocolTCP,
},
},
},
},
expected: []PortPair{
{
LocalPort: 53,
RemotePort: 53,
Protocol: constants.ProtocolUDP,
},
{
LocalPort: 0,
RemotePort: 53,
Protocol: constants.ProtocolTCP,
},
{
LocalPort: 5353,
RemotePort: 53,
Protocol: constants.ProtocolUDP,
},
},
},
"automatically determine protocol": {
args: []string{"5353:53", "8080"},
obj: &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "tcp",
ContainerPort: 8080,
Protocol: corev1.ProtocolTCP,
},
{
Name: "udp",
ContainerPort: 53,
Protocol: corev1.ProtocolUDP,
},
},
},
},
},
},
},
},
expected: []PortPair{
{
LocalPort: 5353,
RemotePort: 53,
Protocol: constants.ProtocolUDP,
},
{
LocalPort: 8080,
RemotePort: 8080,
Protocol: constants.ProtocolTCP,
},
},
},
"override protocol": {
args: []string{"5353@tcp"},
obj: &appsv1.DaemonSet{
Spec: appsv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "udp",
ContainerPort: 53,
Protocol: corev1.ProtocolUDP,
},
},
},
},
},
},
},
},
expected: []PortPair{
{
LocalPort: 5353,
RemotePort: 5353,
Protocol: constants.ProtocolTCP,
},
},
},
"ambiguous protocol": {
args: []string{"8080"},
obj: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
{
Name: "tcp",
ContainerPort: 8080,
Protocol: corev1.ProtocolTCP,
},
{
Name: "udp",
ContainerPort: 8080,
Protocol: corev1.ProtocolUDP,
},
},
},
},
},
},
expectedErr: fmt.Errorf("ambiguous protocol of port: 8080: [tcp udp]"),
},
"port name not found": {
args: []string{"no-such-port"},
obj: &appsv1.Deployment{},
expectedErr: fmt.Errorf(`port name not found: "no-such-port"`),
},
"unknown protocol": {
args: []string{"8080@sctp"},
expectedErr: fmt.Errorf(`unknown protocol: "sctp"`),
},
"invalid port format": {
args: []string{"1:2:3"},
expectedErr: fmt.Errorf(`invalid port format: "1:2:3"`),
},
"invalid remote port": {
args: []string{"foo"},
expectedErr: fmt.Errorf(`invalid port: "foo"`),
},
"invalid local port": {
args: []string{"foo:123"},
expectedErr: fmt.Errorf(`invalid port: "foo"`),
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
p := NewParser(tc.args).WithObject(tc.obj)
got, err := p.Parse()
if tc.expectedErr != nil {
r.Equal(tc.expectedErr, err)
return
}
r.NoError(err)
r.Equal(tc.expected, got)
})
}
}
07070100000023000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001C00000000krelay-0.1.3/pkg/remoteaddr07070100000024000081A400000000000000000000000168B36E5000000C96000000000000000000000000000000000000002700000000krelay-0.1.3/pkg/remoteaddr/dynamic.gopackage remoteaddr
import (
"context"
"errors"
"fmt"
"log/slog"
"sort"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)
type dynamicAddr struct {
podCli typedcorev1.PodInterface
selector string
mu sync.RWMutex
podName string
addr xnet.Addr
}
var _ Getter = (*dynamicAddr)(nil)
func (d *dynamicAddr) Get() (xnet.Addr, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.addr, nil
}
func (d *dynamicAddr) watchForUpdates(w watch.Interface) {
defer w.Stop()
for ev := range w.ResultChan() {
slog.Debug("Receive event", slog.Any("event", ev))
switch ev.Type {
case watch.Bookmark, watch.Error:
slog.Debug("Ignore specific events", slog.String("type", string(ev.Type)))
continue
default: // make linter happy
}
pod, ok := ev.Object.(*corev1.Pod)
if !ok || pod.Name != d.podName {
slog.Debug("Ignore event from unrelated pod",
slog.String("pod", pod.Name),
slog.String("current", d.podName),
)
continue
}
if ev.Type == watch.Modified && pod.DeletionTimestamp == nil && pod.Status.Phase == corev1.PodRunning {
slog.Debug("Ignore event since the pod is still running", slog.String("pod", pod.Name))
continue
}
slog.Debug("Try to update remote address", slog.String("current", d.podName))
err := wait.PollUntilContextTimeout(context.TODO(), time.Second*2, time.Minute, true, func(ctx context.Context) (bool, error) {
_, err := d.updatePodIP(ctx)
if err == nil {
return true, nil
}
slog.Warn("Fail to update remote address. Will retry.", slogutil.Error(err))
return false, nil
})
if err != nil {
slog.Error("Fail to update remote address within timeout")
} else {
slog.Debug("Successfully update remote address", slog.String("current", d.podName))
}
}
}
func (d *dynamicAddr) updatePodIP(ctx context.Context) (rv string, err error) {
podList, err := d.podCli.List(ctx, metav1.ListOptions{
LabelSelector: d.selector,
})
if err != nil {
return "", fmt.Errorf("list pods: %w", err)
}
pods := podList.Items
sort.Slice(pods, func(i, j int) bool {
return !pods[i].CreationTimestamp.Before(&pods[j].CreationTimestamp)
})
for _, pod := range pods {
if pod.Status.Phase == corev1.PodRunning {
d.mu.Lock()
d.podName = pod.Name
d.addr, _ = xnet.AddrFromIP(pod.Status.PodIP)
d.mu.Unlock()
return podList.ResourceVersion, nil
}
}
return "", errors.New("no running pods found")
}
func (d *dynamicAddr) init() error {
rv, err := d.updatePodIP(context.Background())
if err != nil {
return err
}
w, err := watchtools.NewRetryWatcherWithContext(context.Background(), rv, &cache.ListWatch{
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return d.podCli.Watch(ctx, options)
},
})
if err != nil {
return fmt.Errorf("watch pods: %w", err)
}
go d.watchForUpdates(w)
return nil
}
07070100000025000081A400000000000000000000000168B36E5000000203000000000000000000000000000000000000002A00000000krelay-0.1.3/pkg/remoteaddr/remoteaddr.gopackage remoteaddr
import (
"fmt"
"k8s.io/client-go/kubernetes"
"github.com/knight42/krelay/pkg/xnet"
)
type Getter interface {
Get() (xnet.Addr, error)
}
func NewStaticAddr(addr xnet.Addr) Getter {
return &staticAddr{addr: addr}
}
func NewDynamicAddr(cs kubernetes.Interface, ns, selector string) (Getter, error) {
ret := &dynamicAddr{
podCli: cs.CoreV1().Pods(ns),
selector: selector,
}
err := ret.init()
if err != nil {
return nil, fmt.Errorf("init pod ip: %w", err)
}
return ret, nil
}
07070100000026000081A400000000000000000000000168B36E50000000DC000000000000000000000000000000000000002600000000krelay-0.1.3/pkg/remoteaddr/static.gopackage remoteaddr
import (
"github.com/knight42/krelay/pkg/xnet"
)
type staticAddr struct {
addr xnet.Addr
}
var _ Getter = (*staticAddr)(nil)
func (s *staticAddr) Get() (xnet.Addr, error) {
return s.addr, nil
}
07070100000027000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001600000000krelay-0.1.3/pkg/slog07070100000028000081A400000000000000000000000168B36E5000000218000000000000000000000000000000000000001E00000000krelay-0.1.3/pkg/slog/slog.gopackage slog
import (
"log/slog"
)
// Error returns an Attr for an error.
func Error(err error) slog.Attr {
return slog.String("error", err.Error())
}
// Uint16 converts an uint16 to an uint64 and returns
// an Attr with that value.
func Uint16(key string, v uint16) slog.Attr {
return slog.Uint64(key, uint64(v))
}
func MapVerbosityToLogLevel(v int) slog.Level {
switch {
case v >= 4:
return slog.LevelDebug
case v >= 3:
return slog.LevelInfo
case v >= 2:
return slog.LevelWarn
default:
return slog.LevelError
}
}
07070100000029000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001B00000000krelay-0.1.3/pkg/testutils0707010000002A000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001F00000000krelay-0.1.3/pkg/testutils/tcp0707010000002B000081A400000000000000000000000168B36E50000002A0000000000000000000000000000000000000002600000000krelay-0.1.3/pkg/testutils/tcp/tcp.gopackage tcp
import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
)
func NewTLSServer(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *url.URL, uint16) {
server := httptest.NewTLSServer(handler)
u, _ := url.Parse(server.URL)
port, err := strconv.ParseUint(u.Port(), 10, 16)
if err != nil {
t.Fatal(err)
}
return server, u, uint16(port)
}
func NewTCPServer(t *testing.T, handler func(conn net.Conn)) net.Listener {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
go func() {
c, err := l.Accept()
if err != nil {
t.Errorf("accept: %v", err)
return
}
handler(c)
}()
return l
}
0707010000002C000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001500000000krelay-0.1.3/pkg/xio0707010000002D000081A400000000000000000000000168B36E50000000FE000000000000000000000000000000000000001E00000000krelay-0.1.3/pkg/xio/write.gopackage xio
import (
"io"
)
func WriteFull(w io.Writer, p []byte) (n int, err error) {
size := len(p)
for n < size && err == nil {
var nw int
nw, err = w.Write(p[n:])
n += nw
}
if n == size {
return n, nil
}
return n, io.ErrShortWrite
}
0707010000002E000081A400000000000000000000000168B36E50000001F3000000000000000000000000000000000000002300000000krelay-0.1.3/pkg/xio/write_test.gopackage xio
import (
"testing"
"github.com/stretchr/testify/require"
)
type fakeWriter struct {
batchSize int
buf []byte
}
func (f *fakeWriter) Write(p []byte) (n int, err error) {
size := min(len(p), f.batchSize)
f.buf = append(f.buf, p[:size]...)
return size, nil
}
func TestWriteFull(t *testing.T) {
w := &fakeWriter{batchSize: 2}
r := require.New(t)
const msg = "0123456789"
n, err := WriteFull(w, []byte(msg))
r.NoError(err)
r.Len(msg, n)
r.Equal(msg, string(w.buf))
}
0707010000002F000041ED00000000000000000000000268B36E5000000000000000000000000000000000000000000000001600000000krelay-0.1.3/pkg/xnet07070100000030000081A400000000000000000000000168B36E5000000375000000000000000000000000000000000000001D00000000krelay-0.1.3/pkg/xnet/ack.gopackage xnet
import (
"fmt"
"io"
)
type AckCode uint8
const (
AckCodeOK = iota + 1
AckCodeUnknownError
AckCodeNoSuchHost
AckCodeResolveTimeout
AckCodeConnectTimeout
AckCodeUnknownProtocol
)
func (c AckCode) Error() string {
switch c {
case AckCodeUnknownError:
return "Unknown error"
case AckCodeNoSuchHost:
return "No such host"
case AckCodeResolveTimeout:
return "Resolve timeout"
case AckCodeConnectTimeout:
return "Connect timeout"
case AckCodeUnknownProtocol:
return "Unknown protocol"
default:
return "Unknown code"
}
}
type Acknowledgement struct {
Code AckCode
}
func (a *Acknowledgement) Marshal() []byte {
return []byte{byte(a.Code)}
}
func (a *Acknowledgement) FromReader(r io.Reader) error {
var buf [1]byte
_, err := r.Read(buf[:])
if err != nil {
return fmt.Errorf("read ack: %w", err)
}
a.Code = AckCode(buf[0])
return nil
}
07070100000031000081A400000000000000000000000168B36E5000000332000000000000000000000000000000000000001E00000000krelay-0.1.3/pkg/xnet/addr.gopackage xnet
import (
"fmt"
"net"
)
const (
AddrTypeIP byte = iota
AddrTypeHost
)
type Addr struct {
typ byte
data []byte
}
func (a *Addr) Marshal() []byte {
return a.data
}
func (a *Addr) String() string {
switch a.typ {
case AddrTypeIP:
return net.IP(a.data).String()
default:
return string(a.data)
}
}
func (a *Addr) IsZero() bool {
return len(a.data) == 0
}
func AddrFromBytes(addrType byte, data []byte) Addr {
return Addr{typ: addrType, data: data}
}
func AddrFromIP(ipStr string) (Addr, error) {
ip := net.ParseIP(ipStr)
if ip == nil {
return Addr{}, fmt.Errorf("invalid ip: %s", ipStr)
}
ipv4 := ip.To4()
if ipv4 != nil {
ip = ipv4
}
return Addr{typ: AddrTypeIP, data: ip}, nil
}
func AddrFromHost(host string) Addr {
return Addr{typ: AddrTypeHost, data: []byte(host)}
}
07070100000032000081A400000000000000000000000168B36E5000000683000000000000000000000000000000000000002300000000krelay-0.1.3/pkg/xnet/addr_test.gopackage xnet
import (
"net"
"testing"
"github.com/stretchr/testify/require"
)
func TestAddrSerialization(t *testing.T) {
testCases := map[string]struct {
getAddr func(t *testing.T) Addr
expectedBytes []byte
expectedStr string
}{
"AddrFromBytes: ipv4": {
getAddr: func(_ *testing.T) Addr {
return AddrFromBytes(AddrTypeIP, net.IPv4(192, 168, 1, 1).To4())
},
expectedBytes: []byte{192, 168, 1, 1},
expectedStr: "192.168.1.1",
},
"AddrFromBytes: ipv6": {
getAddr: func(_ *testing.T) Addr {
return AddrFromBytes(AddrTypeIP, net.IPv6linklocalallnodes)
},
expectedBytes: []byte{0xff, 0x02, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01},
expectedStr: "ff02::1",
},
"AddrFromHost": {
getAddr: func(_ *testing.T) Addr {
return AddrFromHost("www.google.com")
},
expectedBytes: []byte("www.google.com"),
expectedStr: "www.google.com",
},
"AddrFromIP: ipv4": {
getAddr: func(t *testing.T) Addr {
addr, err := AddrFromIP("192.168.1.1")
if err != nil {
t.Fatal(err)
}
return addr
},
expectedBytes: []byte{192, 168, 1, 1},
expectedStr: "192.168.1.1",
},
"AddrFromIP: ipv6": {
getAddr: func(t *testing.T) Addr {
addr, err := AddrFromIP("ff02::1")
if err != nil {
t.Fatal(err)
}
return addr
},
expectedBytes: []byte{0xff, 0x02, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01},
expectedStr: "ff02::1",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
addr := tc.getAddr(t)
r.Equal(tc.expectedBytes, addr.Marshal())
r.Equal(tc.expectedStr, addr.String())
})
}
}
07070100000033000081A400000000000000000000000168B36E50000007F3000000000000000000000000000000000000002000000000krelay-0.1.3/pkg/xnet/header.gopackage xnet
import (
"encoding/binary"
"fmt"
"io"
"math/rand"
)
const (
lengthAllMandatoryFields = 12 // 1(version) + 2(total length) + 5(request id) + 1(protocol) + 2(port) + 1(addr type)
lengthRequestID = 5
)
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
func NewRequestID() string {
b := make([]rune, lengthRequestID)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
type Header struct {
Version byte
RequestID string
Protocol byte
Port uint16
Addr Addr
}
func (h *Header) Marshal() []byte {
addrBytes := h.Addr.Marshal()
totalLen := lengthAllMandatoryFields + len(addrBytes)
buf := make([]byte, totalLen)
cursor := 0
buf[cursor] = h.Version
cursor++
binary.BigEndian.PutUint16(buf[cursor:cursor+2], uint16(totalLen))
cursor += 2
copy(buf[cursor:cursor+lengthRequestID], h.RequestID)
cursor += lengthRequestID
buf[cursor] = h.Protocol
cursor++
binary.BigEndian.PutUint16(buf[cursor:cursor+2], h.Port)
cursor += 2
buf[cursor] = h.Addr.typ
cursor++
copy(buf[cursor:], addrBytes)
return buf
}
func (h *Header) FromReader(r io.Reader) error {
var lengthBuf [3]byte
_, err := io.ReadFull(r, lengthBuf[:])
if err != nil {
return fmt.Errorf("read total length: %w", err)
}
h.Version = lengthBuf[0]
// TODO: handle different versions
totalLen := binary.BigEndian.Uint16(lengthBuf[1:])
if totalLen < lengthAllMandatoryFields {
return fmt.Errorf("body too short: %d", totalLen)
}
bodyBuf := make([]byte, totalLen-3)
_, err = io.ReadFull(r, bodyBuf)
if err != nil {
return fmt.Errorf("read full body: %w", err)
}
cursor := 0
reqIDBytes := bodyBuf[cursor : cursor+lengthRequestID]
cursor += lengthRequestID
proto := bodyBuf[cursor]
cursor++
port := binary.BigEndian.Uint16(bodyBuf[cursor : cursor+2])
cursor += 2
h.RequestID = string(reqIDBytes)
h.Protocol = proto
h.Port = port
h.Addr = AddrFromBytes(bodyBuf[cursor], bodyBuf[cursor+1:])
return nil
}
07070100000034000081A400000000000000000000000168B36E5000000607000000000000000000000000000000000000002500000000krelay-0.1.3/pkg/xnet/header_test.gopackage xnet
import (
"bytes"
"net"
"testing"
"github.com/stretchr/testify/require"
)
var fakeRequestID = "00000"
var headerCases = map[string]struct {
hdr Header
bytes []byte
}{
"host": {
hdr: Header{
Version: 1,
RequestID: fakeRequestID,
Protocol: ProtocolTCP,
Port: 80,
Addr: AddrFromHost("a.com"),
},
bytes: []byte{
1,
0, 0x11,
0x30, 0x30, 0x30, 0x30, 0x30,
0,
0, 80,
1,
97, 46, 99, 111, 109,
},
},
"ipv4": {
hdr: Header{
Version: 0,
RequestID: fakeRequestID,
Protocol: ProtocolUDP,
Port: 53,
Addr: AddrFromBytes(AddrTypeIP, net.IPv4(192, 168, 1, 1).To4()),
},
bytes: []byte{
0,
0, 0x10,
0x30, 0x30, 0x30, 0x30, 0x30,
1,
0, 53,
0,
192, 168, 1, 1,
},
},
"ipv6": {
hdr: Header{
Version: 0,
RequestID: fakeRequestID,
Protocol: ProtocolTCP,
Port: 8080,
Addr: AddrFromBytes(AddrTypeIP, net.IPv6loopback),
},
bytes: []byte{
0,
0, 0x1c,
0x30, 0x30, 0x30, 0x30, 0x30,
0,
0x1f, 0x90,
0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1,
},
},
}
func TestHeaderMarshal(t *testing.T) {
for name, tc := range headerCases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.bytes, tc.hdr.Marshal())
})
}
}
func TestHeaderUnmarshal(t *testing.T) {
for name, tc := range headerCases {
t.Run(name, func(t *testing.T) {
r := require.New(t)
got := Header{}
r.NoError(got.FromReader(bytes.NewBuffer(tc.bytes)))
r.Equal(tc.hdr, got)
})
}
}
07070100000035000081A400000000000000000000000168B36E500000003E000000000000000000000000000000000000001F00000000krelay-0.1.3/pkg/xnet/proto.gopackage xnet
const (
ProtocolTCP byte = iota
ProtocolUDP
)
07070100000036000081A400000000000000000000000168B36E5000000834000000000000000000000000000000000000001D00000000krelay-0.1.3/pkg/xnet/tcp.gopackage xnet
import (
"io"
"log/slog"
"net"
"github.com/knight42/krelay/pkg/constants"
)
var tcpPool = newBufferPool(constants.TCPBufferSize)
// This does the actual data transfer.
// The broker only closes the Read side.
func tcpBroker(dst, src net.Conn, srcClosed chan struct{}) {
defer src.Close()
bufPtr := tcpPool.Get().(*[]byte)
defer tcpPool.Put(bufPtr)
buf := *bufPtr
// We can handle errors in a finer-grained manner by inlining io.Copy (it's
// simple, and we drop the ReaderFrom or WriterTo checks for
// net.Conn->net.Conn transfers, which aren't needed). This would also let
// us adjust buffer size.
_, _ = io.CopyBuffer(dst, src, buf)
close(srcClosed)
}
// ProxyTCP is excerpt from https://stackoverflow.com/a/27445109/4725840
func ProxyTCP(reqID string, downConn, upConn *net.TCPConn) {
l := slog.With(slog.String(constants.LogFieldRequestID, reqID))
defer l.Debug("ProxyTCP exit")
// channels to wait on the close event for each connection
upClosed := make(chan struct{})
downClosed := make(chan struct{})
go tcpBroker(upConn, downConn, downClosed)
go tcpBroker(downConn, upConn, upClosed)
// wait for one half of the proxy to exit, then trigger a shutdown of the
// other half by calling CloseRead(). This will break the read loop in the
// broker and allow us to fully close the connection cleanly without a
// "use of closed network connection" error.
var waitFor chan struct{}
select {
case <-downClosed:
l.Debug("Client close connection")
// the client closed first and any more packets from the server aren't
// useful, so we can optionally SetLinger(0) here to recycle the port
// faster.
_ = upConn.SetLinger(0)
_ = upConn.CloseRead()
waitFor = upClosed
case <-upClosed:
l.Debug("Server close connection")
_ = downConn.CloseRead()
waitFor = downClosed
}
// Wait for the other connection to close.
// This "waitFor" pattern isn't required, but gives us a way to track the
// connection and ensure all copies terminate correctly; we can trigger
// stats on entry and deferred exit of this function.
<-waitFor
}
07070100000037000081A400000000000000000000000168B36E50000003C5000000000000000000000000000000000000002200000000krelay-0.1.3/pkg/xnet/tcp_test.gopackage xnet
import (
"context"
"io"
"net"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/testutils/tcp"
)
func TestProxyHTTPS(t *testing.T) {
const msg = "Hello, World!"
ts, tsURL, _ := tcp.NewTLSServer(t, func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(msg))
})
defer ts.Close()
dialer := net.Dialer{Timeout: time.Second * 10}
l := tcp.NewTCPServer(t, func(c net.Conn) {
upConn, err := dialer.DialContext(context.Background(), constants.ProtocolTCP, tsURL.Host)
if err != nil {
t.Errorf("dial upstream: %v", err)
return
}
ProxyTCP("req-id", c.(*net.TCPConn), upConn.(*net.TCPConn))
})
defer l.Close()
r := require.New(t)
resp, err := ts.Client().Get("https://" + l.Addr().String())
r.NoError(err)
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
r.NoError(err)
r.Equal(msg, string(body))
}
07070100000038000081A400000000000000000000000168B36E50000009C0000000000000000000000000000000000000001D00000000krelay-0.1.3/pkg/xnet/udp.gopackage xnet
import (
"encoding/binary"
"io"
"log/slog"
"net"
"time"
"github.com/knight42/krelay/pkg/alarm"
"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/xio"
)
// UDPConn wraps a *net.UDPConn and overrides the Read and ReadFrom methods to
// automatically prepend the length of the packet.
type UDPConn struct {
*net.UDPConn
}
func (c *UDPConn) ReadFrom(buf []byte) (n int, addr net.Addr, err error) {
n, addr, err = c.UDPConn.ReadFrom(buf[2:])
binary.BigEndian.PutUint16(buf[:2], uint16(n))
return n + 2, addr, err
}
func (c *UDPConn) Read(buf []byte) (n int, err error) {
n, err = c.UDPConn.Read(buf[2:])
binary.BigEndian.PutUint16(buf[:2], uint16(n))
return n + 2, err
}
func ReadUDPFromStream(r io.Reader, buf []byte, timeout time.Duration) (n int, err error) {
var lengthBuf [2]byte
_, err = readFullWithTimeout(r, lengthBuf[:], timeout)
if err != nil {
return 0, err
}
// assume the capacity of the buffer is big enough
length := binary.BigEndian.Uint16(lengthBuf[:])
return readFullWithTimeout(r, buf[:length], timeout)
}
var udpPool = newBufferPool(constants.UDPBufferSize)
func ProxyUDP(reqID string, downConn *net.TCPConn, upConn net.Conn) {
l := slog.With(slog.String(constants.LogFieldRequestID, reqID))
defer l.Debug("ProxyUDP exit")
downClosed := make(chan struct{})
upClosed := make(chan struct{})
const idleTimeout = time.Second * 110
a := alarm.New(idleTimeout)
a.Start()
go func() {
bufPtr := udpPool.Get().(*[]byte)
defer udpPool.Put(bufPtr)
defer close(downClosed)
buf := *bufPtr
for {
n, err := ReadUDPFromStream(downConn, buf, time.Second*5)
if err != nil {
if isTimeoutError(err) && !a.Done() {
continue
}
return
}
_, err = xio.WriteFull(upConn, buf[:n])
if err != nil {
return
}
a.Reset()
}
}()
go func() {
bufPtr := udpPool.Get().(*[]byte)
defer udpPool.Put(bufPtr)
defer close(upClosed)
buf := *bufPtr
for {
n, err := readConnWithTimeout(upConn, buf, time.Second*5)
if err != nil {
if isTimeoutError(err) && !a.Done() {
continue
}
return
}
_, err = xio.WriteFull(downConn, buf[:n])
if err != nil {
return
}
a.Reset()
}
}()
var waitFor chan struct{}
select {
case <-downClosed:
l.Debug("Client close connection")
_ = upConn.Close()
waitFor = upClosed
case <-upClosed:
l.Debug("Server close connection")
_ = downConn.CloseRead()
waitFor = downClosed
}
<-waitFor
}
07070100000039000081A400000000000000000000000168B36E5000000569000000000000000000000000000000000000002200000000krelay-0.1.3/pkg/xnet/udp_test.gopackage xnet
import (
"bytes"
"net"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestReadUDPFromStream(t *testing.T) {
buf := make([]byte, 10)
data := bytes.NewBuffer([]byte{0x00, 0x03, 0x30, 0x31, 0x32})
r := require.New(t)
n, err := ReadUDPFromStream(data, buf, 0)
r.NoError(err)
r.Equal(3, n)
r.Equal(buf[:n], []byte("012"))
}
func TestUDPConn(t *testing.T) {
r := require.New(t)
serverConn, err := net.ListenPacket("udp", "127.0.0.1:0")
r.NoError(err)
defer serverConn.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
svrUDPConn := &UDPConn{UDPConn: serverConn.(*net.UDPConn)}
buf := make([]byte, 10)
n, cliAddr, err := svrUDPConn.ReadFrom(buf)
if err != nil {
t.Errorf("ReadFrom: %v", err)
return
}
if !assert.Equal(t, 3, n) {
return
}
if !assert.Equal(t, []byte{0x0, 0x1, 0x0}, buf[:n]) {
return
}
_, err = serverConn.WriteTo([]byte{0x01, 0x02, 0x03}, cliAddr)
if err != nil {
t.Errorf("WriteTo: %v", err)
return
}
}()
clientConn, err := net.Dial("udp", serverConn.LocalAddr().String())
r.NoError(err)
uc := UDPConn{UDPConn: clientConn.(*net.UDPConn)}
_, err = uc.Write([]byte{0x00})
r.NoError(err)
buf := make([]byte, 10)
n, err := uc.Read(buf)
r.NoError(err)
r.Equal([]byte{0x0, 0x3, 0x1, 0x2, 0x3}, buf[:n])
wg.Wait()
}
0707010000003A000081A400000000000000000000000168B36E50000003F3000000000000000000000000000000000000001F00000000krelay-0.1.3/pkg/xnet/utils.gopackage xnet
import (
"errors"
"io"
"net"
"strconv"
"sync"
"time"
)
func readConnWithTimeout(c net.Conn, buf []byte, timeout time.Duration) (n int, err error) {
setReadDeadline(c, timeout)
return c.Read(buf)
}
func isTimeoutError(err error) bool {
netErr, ok := err.(net.Error)
return ok && netErr.Timeout()
}
func readFullWithTimeout(r io.Reader, buf []byte, timeout time.Duration) (int, error) {
conn, ok := r.(net.Conn)
if ok {
setReadDeadline(conn, timeout)
return io.ReadFull(conn, buf)
}
return io.ReadFull(r, buf)
}
func setReadDeadline(c net.Conn, timeout time.Duration) {
if timeout > 0 {
_ = c.SetReadDeadline(time.Now().Add(timeout))
}
}
func JoinHostPort(host string, port uint16) string {
return net.JoinHostPort(host, strconv.Itoa(int(port)))
}
func IsClosedConnectionError(err error) bool {
return errors.Is(err, net.ErrClosed)
}
func newBufferPool(size int) sync.Pool {
return sync.Pool{
New: func() any {
buf := make([]byte, size)
return &buf
},
}
}
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!232 blocks