Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions app/reverse/portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) err
}

if isDomain(ob.Target, p.domain) {
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
opts := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...)

muxClient, err := mux.NewClientWorker(transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
}, mux.ClientStrategy{})
if err != nil {
return errors.New("failed to create mux client worker").Base(err).AtWarning()
}
Expand All @@ -84,11 +91,24 @@ func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) err

p.picker.AddWorker(worker)

if _, ok := link.Reader.(*pipe.Reader); !ok {
select {
case <-ctx.Done():
case <-muxClient.WaitClosed():
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
requestDone := func() error {
if err := buf.Copy(link.Reader, inboundLink.Writer); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
}
responseDone := func() error {
if err := buf.Copy(inboundLink.Reader, link.Writer); err != nil {
return err
}
return nil
}
requestDonePost := task.OnSuccess(requestDone, task.Close(inboundLink.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(inboundLink.Reader)
common.Interrupt(inboundLink.Writer)
return errors.New("connection ends").Base(err)
}
return nil
}
Expand Down
43 changes: 37 additions & 6 deletions common/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (
"io"
"time"

"github.com/xtls/xray-core/app/dispatcher"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/log"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
//"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/core"
//"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/pipe"
Expand Down Expand Up @@ -64,14 +66,43 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
if dest.Address != muxCoolAddress {
return s.dispatcher.DispatchLink(ctx, dest, link)
}
link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link)
worker, err := NewServerWorker(ctx, s.dispatcher, link)

// For Mux, we need to use pipe to guard against multiple sub-connections writing back responses at the same time
// sessionPolicy = h.policyManager.ForLevel(request.User.Level)
// ctx, cancel := context.WithCancel(ctx)
// timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
// ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
opts := pipe.OptionsFromContext(ctx)
uplinkReader, uplinkWriter := pipe.New(opts...)
downlinkReader, downlinkWriter := pipe.New(opts...)

_, err := NewServerWorker(ctx, s.dispatcher, &transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
})
if err != nil {
return err
}
select {
case <-ctx.Done():
case <-worker.done.Wait():
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
requestDone := func() error {
//defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
if err := buf.Copy(link.Reader, inboundLink.Writer); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
}
responseDone := func() error {
//defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
if err := buf.Copy(inboundLink.Reader, link.Writer); err != nil {
return err
}
return nil
}
requestDonePost := task.OnSuccess(requestDone, task.Close(inboundLink.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(inboundLink.Reader)
common.Interrupt(inboundLink.Writer)
return errors.New("connection ends").Base(err)
}
return nil
}
Expand Down
92 changes: 16 additions & 76 deletions proxy/vmess/inbound/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ import (
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/common/uuid"
"github.com/xtls/xray-core/core"
feature_inbound "github.com/xtls/xray-core/features/inbound"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/proxy/vmess"
"github.com/xtls/xray-core/proxy/vmess/encoding"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet/stat"
)

Expand Down Expand Up @@ -184,44 +183,6 @@ func (h *Handler) RemoveUser(ctx context.Context, email string) error {
return nil
}

func transferResponse(timer signal.ActivityUpdater, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input buf.Reader, output *buf.BufferedWriter) error {
session.EncodeResponseHeader(response, output)

bodyWriter, err := session.EncodeResponseBody(request, output)
if err != nil {
return errors.New("failed to start decoding response").Base(err)
}
{
// Optimize for small response packet
data, err := input.ReadMultiBuffer()
if err != nil {
return err
}

if err := bodyWriter.WriteMultiBuffer(data); err != nil {
return err
}
}

if err := output.SetBuffered(false); err != nil {
return err
}

if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
return err
}

account := request.User.Account.(*vmess.MemoryAccount)

if request.Option.Has(protocol.RequestOptionChunkStream) && !account.NoTerminationSignal {
if err := bodyWriter.WriteMultiBuffer(buf.MultiBuffer{}); err != nil {
return err
}
}

return nil
}

// Process implements proxy.Inbound.Process().
func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatcher routing.Dispatcher) error {
sessionPolicy := h.policyManager.ForLevel(0)
Expand Down Expand Up @@ -275,49 +236,28 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
inbound.CanSpliceCopy = 3
inbound.User = request.User

sessionPolicy = h.policyManager.ForLevel(request.User.Level)

ctx, cancel := context.WithCancel(ctx)
timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)

ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
link, err := dispatcher.Dispatch(ctx, request.Destination())
bodyReader, err := svrSession.DecodeRequestBody(request, reader)
if err != nil {
return errors.New("failed to dispatch request to ", request.Destination()).Base(err)
return errors.New("failed to start decoding").Base(err)
}

requestDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)

bodyReader, err := svrSession.DecodeRequestBody(request, reader)
if err != nil {
return errors.New("failed to start decoding").Base(err)
}
if err := buf.Copy(bodyReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
return errors.New("failed to transfer request").Base(err)
}
return nil
writer := buf.NewBufferedWriter(buf.NewWriter(connection))
response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request),
}

responseDone := func() error {
defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)

writer := buf.NewBufferedWriter(buf.NewWriter(connection))
defer writer.Flush()

response := &protocol.ResponseHeader{
Command: h.generateCommand(ctx, request),
}
return transferResponse(timer, svrSession, request, response, link.Reader, writer)
svrSession.EncodeResponseHeader(response, writer)
bodyWriter, err := svrSession.EncodeResponseBody(request, writer)
if err != nil {
return errors.New("failed to start decoding response").Base(err)
}
writer.SetFlushNext()

requestDonePost := task.OnSuccess(requestDone, task.Close(link.Writer))
if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
common.Interrupt(link.Reader)
common.Interrupt(link.Writer)
return errors.New("connection ends").Base(err)
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
Reader: bodyReader,
Writer: bodyWriter},
); err != nil {
return errors.New("failed to dispatch request").Base(err)
}

return nil
}

Expand Down
Loading
Loading