Files
proxy-lite/internal/app/app.go
2026-03-30 21:26:53 -04:00

502 lines
11 KiB
Go

package app
import (
"fmt"
"io"
"log"
"net"
"net/http"
"strings"
"sync"
"time"
"infinite-noodle/internal/noodle"
"infinite-noodle/internal/web"
)
type Config struct {
DataPath string
Host string
Port int
RunTest bool
}
func Run(cfg Config) error {
listenAddr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
noodleChannel := make(chan noodle.Noodle)
db := noodle.NewDatabase(cfg.DataPath)
defer db.Close()
if cfg.RunTest {
runTestSequence(db)
}
go systemCheck(db, noodleChannel)
go expirationCheck(db, noodleChannel)
go proxify(noodleChannel)
http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(web.StaticFiles()))))
http.HandleFunc("/", web.HandleMain(db, &noodleChannel))
http.HandleFunc("/add", web.HandleAdd(db, &noodleChannel))
http.HandleFunc("/toggle", web.HandleToggle(db, &noodleChannel))
http.HandleFunc("/delete", web.HandleDelete(db, &noodleChannel))
log.Printf("Server starting on %s", listenAddr)
return http.ListenAndServe(listenAddr, nil)
}
func systemCheck(db *noodle.Database, noodleChannel chan noodle.Noodle) {
for {
noodles := db.GetAll()
for _, item := range noodles {
noodleChannel <- item
}
time.Sleep(60 * time.Second)
}
}
func proxify(noodleChannel chan noodle.Noodle) {
noodleMap := make(map[string]managedProxy)
for {
item := <-noodleChannel
_, running := noodleMap[item.Id]
if item.IsUp && !running {
proxy, err := newProxy(item)
if err != nil {
log.Print(err)
continue
}
noodleMap[item.Id] = proxy
go proxy.Run()
continue
}
if !item.IsUp && running {
log.Printf("Closing noodle=%v", item)
if err := noodleMap[item.Id].Close(); err != nil {
log.Print(err)
}
delete(noodleMap, item.Id)
}
}
}
type managedProxy interface {
Run()
Close() error
}
func newProxy(item noodle.Noodle) (managedProxy, error) {
listenAddr := fmt.Sprintf("0.0.0.0:%d", item.ListenPort)
targetAddr := fmt.Sprintf("%s:%d", item.DestHost, item.DestPort)
proto := normalizeProto(item.Proto)
log.Printf("Starting a %s noodle from %s to %s with source=%s", proto, listenAddr, targetAddr, item.Src)
switch proto {
case "UDP":
return newUDPNoodleProxy(listenAddr, targetAddr, item.Src)
default:
return newTCPNoodleProxy(listenAddr, targetAddr, item.Src)
}
}
func normalizeProto(proto string) string {
switch strings.ToUpper(strings.TrimSpace(proto)) {
case "UDP":
return "UDP"
default:
return "TCP"
}
}
type tcpNoodleProxy struct {
listenAddr string
targetAddr string
allowedIP string
listener net.Listener
mu sync.Mutex
conns map[net.Conn]struct{}
}
func newTCPNoodleProxy(listenAddr, targetAddr, allowedIP string) (*tcpNoodleProxy, error) {
ln, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, err
}
return &tcpNoodleProxy{
listenAddr: listenAddr,
targetAddr: targetAddr,
allowedIP: allowedIP,
listener: ln,
conns: make(map[net.Conn]struct{}),
}, nil
}
func (p *tcpNoodleProxy) Run() {
for {
conn, err := p.listener.Accept()
if err != nil {
if isClosedNetworkError(err) {
return
}
log.Printf("Accept failed for %s: %v", p.listenAddr, err)
return
}
go p.handleConn(conn)
}
}
func (p *tcpNoodleProxy) handleConn(src net.Conn) {
if !allowSource(p.allowedIP, src.RemoteAddr()) {
log.Printf("Rejected noodle connection from %s; allowed source is %s", src.RemoteAddr().String(), p.allowedIP)
src.Close()
return
}
dst, err := net.Dial("tcp", p.targetAddr)
if err != nil {
log.Printf("Dial failed from %s to %s: %v", p.listenAddr, p.targetAddr, err)
src.Close()
return
}
p.trackConn(src)
p.trackConn(dst)
defer p.untrackAndClose(src)
defer p.untrackAndClose(dst)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if _, err := io.Copy(dst, src); err != nil && !isClosedNetworkError(err) {
log.Printf("Proxy copy src->dst failed for %s to %s: %v", p.listenAddr, p.targetAddr, err)
}
closeWrite(dst)
}()
go func() {
defer wg.Done()
if _, err := io.Copy(src, dst); err != nil && !isClosedNetworkError(err) {
log.Printf("Proxy copy dst->src failed for %s to %s: %v", p.targetAddr, p.listenAddr, err)
}
closeWrite(src)
}()
wg.Wait()
}
func (p *tcpNoodleProxy) Close() error {
err := p.listener.Close()
p.mu.Lock()
defer p.mu.Unlock()
for conn := range p.conns {
_ = conn.Close()
}
return err
}
func (p *tcpNoodleProxy) trackConn(conn net.Conn) {
p.mu.Lock()
defer p.mu.Unlock()
p.conns[conn] = struct{}{}
}
func (p *tcpNoodleProxy) untrackAndClose(conn net.Conn) {
p.mu.Lock()
delete(p.conns, conn)
p.mu.Unlock()
_ = conn.Close()
}
type udpNoodleProxy struct {
listenAddr string
targetAddr string
allowedIP string
conn *net.UDPConn
mu sync.Mutex
sessions map[string]*udpSession
closed bool
}
type udpSession struct {
clientAddr *net.UDPAddr
backend *net.UDPConn
lastSeen time.Time
}
const udpSessionIdleTimeout = 2 * time.Minute
func newUDPNoodleProxy(listenAddr, targetAddr, allowedIP string) (*udpNoodleProxy, error) {
addr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &udpNoodleProxy{
listenAddr: listenAddr,
targetAddr: targetAddr,
allowedIP: allowedIP,
conn: conn,
sessions: make(map[string]*udpSession),
}, nil
}
func (p *udpNoodleProxy) Run() {
go p.cleanupIdleSessions()
buf := make([]byte, 65535)
for {
n, clientAddr, err := p.conn.ReadFromUDP(buf)
if err != nil {
if isClosedNetworkError(err) {
return
}
log.Printf("UDP read failed for %s: %v", p.listenAddr, err)
return
}
if !allowSource(p.allowedIP, clientAddr) {
log.Printf("Rejected noodle datagram from %s; allowed source is %s", clientAddr.String(), p.allowedIP)
continue
}
session, err := p.sessionFor(clientAddr)
if err != nil {
log.Printf("UDP session setup failed for %s to %s: %v", clientAddr.String(), p.targetAddr, err)
continue
}
payload := append([]byte(nil), buf[:n]...)
if _, err := session.backend.Write(payload); err != nil && !isClosedNetworkError(err) {
log.Printf("UDP write failed from %s to %s: %v", clientAddr.String(), p.targetAddr, err)
p.removeSession(clientAddr.String(), true)
}
}
}
func (p *udpNoodleProxy) sessionFor(clientAddr *net.UDPAddr) (*udpSession, error) {
key := clientAddr.String()
p.mu.Lock()
if session, ok := p.sessions[key]; ok {
session.lastSeen = time.Now()
p.mu.Unlock()
return session, nil
}
p.mu.Unlock()
targetAddr, err := net.ResolveUDPAddr("udp", p.targetAddr)
if err != nil {
return nil, err
}
backend, err := net.DialUDP("udp", nil, targetAddr)
if err != nil {
return nil, err
}
session := &udpSession{
clientAddr: clientAddr,
backend: backend,
lastSeen: time.Now(),
}
p.mu.Lock()
if existing, ok := p.sessions[key]; ok {
p.mu.Unlock()
_ = backend.Close()
return existing, nil
}
p.sessions[key] = session
p.mu.Unlock()
go p.relayBackend(session)
return session, nil
}
func (p *udpNoodleProxy) relayBackend(session *udpSession) {
buf := make([]byte, 65535)
for {
n, err := session.backend.Read(buf)
if err != nil {
if !isClosedNetworkError(err) {
log.Printf("UDP backend read failed for %s: %v", p.targetAddr, err)
}
p.removeSession(session.clientAddr.String(), false)
return
}
if _, err := p.conn.WriteToUDP(buf[:n], session.clientAddr); err != nil {
if !isClosedNetworkError(err) {
log.Printf("UDP client write failed for %s: %v", session.clientAddr.String(), err)
}
p.removeSession(session.clientAddr.String(), true)
return
}
p.mu.Lock()
if current, ok := p.sessions[session.clientAddr.String()]; ok {
current.lastSeen = time.Now()
}
p.mu.Unlock()
}
}
func (p *udpNoodleProxy) cleanupIdleSessions() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if p.isClosed() {
return
}
cutoff := time.Now().Add(-udpSessionIdleTimeout)
var expired []string
p.mu.Lock()
for key, session := range p.sessions {
if session.lastSeen.Before(cutoff) {
expired = append(expired, key)
}
}
p.mu.Unlock()
for _, key := range expired {
p.removeSession(key, true)
}
}
}
func (p *udpNoodleProxy) removeSession(key string, closeBackend bool) {
p.mu.Lock()
session, ok := p.sessions[key]
if ok {
delete(p.sessions, key)
}
p.mu.Unlock()
if ok && closeBackend {
_ = session.backend.Close()
}
}
func (p *udpNoodleProxy) isClosed() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.closed
}
func (p *udpNoodleProxy) Close() error {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil
}
p.closed = true
sessions := p.sessions
p.sessions = make(map[string]*udpSession)
conn := p.conn
p.mu.Unlock()
for _, session := range sessions {
_ = session.backend.Close()
}
if conn == nil {
return nil
}
return conn.Close()
}
func allowSource(allowedIP string, addr net.Addr) bool {
if allowedIP == "" || allowedIP == "All" {
return true
}
host, _, err := net.SplitHostPort(addr.String())
if err != nil {
return false
}
return host == allowedIP
}
func closeWrite(conn net.Conn) {
if tcpConn, ok := conn.(*net.TCPConn); ok {
_ = tcpConn.CloseWrite()
}
}
func isClosedNetworkError(err error) bool {
if err == nil {
return false
}
if net.ErrClosed != nil && err == net.ErrClosed {
return true
}
return strings.Contains(err.Error(), "use of closed network connection")
}
func expirationCheck(db *noodle.Database, noodleChannel chan noodle.Noodle) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
noodles := db.GetAll()
for _, item := range noodles {
if !item.IsUp {
continue
}
if item.Expiration <= 0 {
if item.IsUp {
item.IsUp = false
noodleChannel <- item
}
if err := db.Delete(item.Id); err != nil {
log.Print(err)
}
continue
}
item.Expiration -= time.Second
if item.Expiration <= 0 {
item.Expiration = 0
if err := db.Update(item); err != nil {
log.Print(err)
continue
}
if item.IsUp {
item.IsUp = false
noodleChannel <- item
}
if err := db.Delete(item.Id); err != nil {
log.Print(err)
}
continue
}
if err := db.Update(item); err != nil {
log.Print(err)
}
}
}
}
func runTestSequence(db *noodle.Database) {
for i := 0; i < 21; i++ {
item := noodle.Noodle{
Id: db.MakeID(),
Name: "Name_Test",
Proto: "TCP",
Src: "All",
ListenPort: 1080 + i,
DestPort: 22,
DestHost: "localhost",
Expiration: time.Duration(time.Now().Second()) * time.Second,
IsUp: true,
}
log.Printf("Test noodle=%v", item)
db.Add(item)
log.Printf("Test id=%s exists=%t", item.Id, db.Handle.Has(item.Id))
log.Printf("Test GetAllGeneric=%v", db.GetAllGeneric())
}
}