本文介绍: 上篇文章说了风头正健的pion生态之livekit,现在轮到pion生态第一个sfu ion,这个由国内大佬鱼大等主持开发两年多开源项目,为国人乃至开源社区普及pion起了至关重要的作用,得到了Sean-Der的大力支持,也汇集了众多高手加盟,livekit的风格也深受其影响,下面是发布ion的习作,望各位大佬指正。………
上篇文章说了风头正健的pion生态之livekit,现在轮到pion生态第一个sfu ion,这个由国内大佬鱼大等主持开发两年多开源项目,为国人乃至开源社区普及pion起了至关重要的作用,得到了Sean-Der的大力支持,也汇集了众多高手加盟,livekit的风格也深受其影响,下面是发布ion的习作,望各位大佬指正
package livekitclient
import (
"fmt"
"strings"
"time"
// "github.com/livekit/server-sdk-go/pkg/media/ivfwriter"
"github.com/livekit/server-sdk-go/pkg/samplebuilder"
ionsdk "github.com/pion/ion-sdk-go"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/pion/webrtc/v3/pkg/media/h264writer"
"github.com/pion/webrtc/v3/pkg/media/ivfwriter"
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
"github.com/xiangxud/rtmp_webrtc_server/identity"
"github.com/xiangxud/rtmp_webrtc_server/log"
// "github.com/livekit/server-sdk-go/pkg/samplebuilder"
)
const (
// sid = "ion"
// uid = ionsdk.RandomKey(6)
)
func (t *LocalTrackPublication) saveToDisk(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
codec := track.Codec()
var fileWriter media.Writer
var err error
if strings.EqualFold(codec.MimeType, webrtc.MimeTypeOpus) {
log.Infof("Got Opus track, saving to disk as ogg (48 kHz, 2 channels)")
fileWriter, err = oggwriter.New(fmt.Sprintf("%d_%d.ogg", codec.PayloadType, track.SSRC()), 48000, 2)
} else if strings.EqualFold(codec.MimeType, webrtc.MimeTypeVP8) {
log.Infof("Got VP8 track, saving to disk as ivf")
fileWriter, err = ivfwriter.New(fmt.Sprintf("%d_%d.ivf", codec.PayloadType, track.SSRC()))
} else if strings.EqualFold(codec.MimeType, webrtc.MimeTypeH264) {
log.Infof("Got H264 track, saving to disk as h264")
fileWriter, err = h264writer.New(fmt.Sprintf("%d_%d.h264", codec.PayloadType, track.SSRC()))
}
if err != nil {
log.Errorf("error: %v", err)
fileWriter.Close()
return
}
for {
rtpPacket, _, err := track.ReadRTP()
if err != nil {
log.Warnf("track.ReadRTP error: %v", err)
break
}
if err := fileWriter.WriteRTP(rtpPacket); err != nil {
log.Warnf("fileWriter.WriteRTP error: %v", err)
break
}
}
}
func (t *LocalTrackPublication) INORoomRTCJoin(r *Room, streamname, identify string) (*ionsdk.RTC, error) {
// join room
uid := streamname + ":" + identify
err := r.IONRoom.Join(
ionsdk.JoinInfo{
Sid: identify,
Uid: uid,
DisplayName: uid,
Role: ionsdk.Role_Host,
Protocol: ionsdk.Protocol_WebRTC,
Direction: ionsdk.Peer_BILATERAL,
},
)
if err != nil {
log.Errorf("Join error: %v", err)
return nil, err
}
// new sdk engine
config := ionsdk.RTCConfig{
WebRTC: ionsdk.WebRTCTransportConfig{
VideoMime: ionsdk.MimeTypeH264,
},
}
joinedch := make(chan struct{})
r.IONRoom.OnJoin = func(success bool, info ionsdk.RoomInfo, err error) {
// THIS IS ROOM SINGAL API
// ===============================
rtc, err1 := ionsdk.NewRTC(r.IONConnector, config)
if err1 != nil {
log.Error(err1)
return
}
// user define receiving rtp
rtc.OnTrack = t.saveToDisk
rtc.GetPubTransport().GetPeerConnection().OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
log.Infof("Connection state changed: %s", state)
})
rtc.OnDataChannel = func(dc *webrtc.DataChannel) {
log.Infof("dc: %v", dc.Label())
}
rtc.OnError = func(err error) {
log.Errorf("err: %v", err)
}
configjoin := ionsdk.JoinConfig{}
configjoin.SetNoPublish()
configjoin.SetNoSubscribe()
configjoin.SetNoAutoSubscribe()
uid = ionsdk.RandomKey(6)
err1 = rtc.Join(identify, uid, &configjoin)
if err1 != nil {
log.Errorf("error: %v", err1)
return
}
log.Infof("rtc.Join ok sid=%v username =%v", identify, uid)
// err = rtc.Join(session, ionsdk.RandomKey(4))
// if err != nil {
// log.Errorf("error: %v", err)
// return
// }
t.IONRtc = rtc
joinedch <- struct{}{}
}
<-joinedch
return t.IONRtc, nil
}
func (r *Room) CreateIonRoom(addr, session string) (*ionsdk.Room, error) {
log.Debug("CreateIonRoom: ", addr, " session: ", session)
r.HostIon = addr
connector := ionsdk.NewConnector(addr)
// uid := ionsdk.RandomKey(6)
room := ionsdk.NewRoom(connector)
peers := room.GetPeers(session)
if len(peers) != 0 {
log.Debug("room is exit peers :", peers)
// err := room.Join(
// ionsdk.JoinInfo{
// Sid: session,
// Uid: uid,
// DisplayName: uid,
// Role: ionsdk.Role_Host,
// Protocol: ionsdk.Protocol_WebRTC,
// Direction: ionsdk.Peer_BILATERAL,
// },
// )
// if err != nil {
// log.Errorf("Join error: %v", err)
// return nil, err
// }
r.IONRoom = room
r.IONConnector = connector
return room, nil
}
// THIS IS ROOM MANAGEMENT API
// ==========================
// create room
err := room.CreateRoom(ionsdk.RoomInfo{Sid: session})
if err != nil {
log.Errorf("error:%v", err)
return nil, err
}
// // new sdk engine
// config := ionsdk.RTCConfig{
// WebRTC: ionsdk.WebRTCTransportConfig{
// VideoMime: ionsdk.MimeTypeH264,
// },
// }
// // THIS IS ROOM SINGAL API
// // ===============================
// rtc, err := ionsdk.NewRTC(connector, config)
// if err != nil {
// log.Error(err)
// // return err
// }
// // user define receiving rtp
// rtc.OnTrack = r.saveToDisk
// rtc.OnDataChannel = func(dc *webrtc.DataChannel) {
// log.Infof("dc: %v", dc.Label())
// }
// rtc.OnError = func(err error) {
// log.Errorf("err: %v", err)
// }
// err = rtc.Join(session, ionsdk.RandomKey(4))
// if err != nil {
// log.Errorf("error: %v", err)
// return nil, err
// }
// log.Infof("rtc.Join ok sid=%v", session)
// // err = rtc.Join(session, ionsdk.RandomKey(4))
// // if err != nil {
// // log.Errorf("error: %v", err)
// // return
// // }
// r.IONRtc = rtc
room.OnJoin = func(success bool, info ionsdk.RoomInfo, err error) {
log.Infof("OnJoin success = %v, info = %v, err = %v", success, info, err)
}
room.OnLeave = func(success bool, err error) {
log.Infof("OnLeave success = %v err = %v", success, err)
}
room.OnPeerEvent = func(state ionsdk.PeerState, peer ionsdk.PeerInfo) {
log.Infof("OnPeerEvent state = %v, peer = %v", state, peer)
}
room.OnMessage = func(from string, to string, data map[string]interface{}) {
log.Infof("OnMessage from = %v, to = %v, data = %v", from, to, data)
}
room.OnDisconnect = func(sid, reason string) {
log.Infof("OnDisconnect sid = %v, reason = %v", sid, reason)
}
room.OnRoomInfo = func(info ionsdk.RoomInfo) {
log.Infof("OnRoomInfo info=%v", info)
}
// join room
// err = room.Join(
// ionsdk.JoinInfo{
// Sid: session,
// Uid: uid,
// DisplayName: uid,
// Role: ionsdk.Role_Host,
// Protocol: ionsdk.Protocol_WebRTC,
// Direction: ionsdk.Peer_BILATERAL,
// },
// )
// if err != nil {
// log.Errorf("Join error: %v", err)
// return nil, err
// }
r.IONRoom = room
r.IONConnector = connector
return room, nil
}
// func (t *LocalTrackPublication) ConnectRoomIon(host, identity string) error {
// // host := "<host>"
// // apiKey := "api-key"
// // apiSecret := "api-secret"
// // roomName := "myroom"
// // identity := "botuser"
// room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
// APIKey: apikey,
// APISecret: apisecret,
// RoomName: roomname,
// ParticipantIdentity: identity,
// })
// if err != nil {
// log.Debug(err)
// return err
// }
// t.LiveKitRoomConnect = room
// room.Callback.OnTrackSubscribed = t.TrackSubscribed
// return nil
// // room.Disconnect()
// }
func (r *Room) TrackPublished_to_ION(streamname string) error {
// - `in` implements io.ReadCloser, such as buffer or file
// - `mime` has to be one of webrtc.MimeType...
// videoTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264})
r.ionlock.Lock()
defer r.ionlock.Unlock()
if r.Ctx != nil && r.IONRoom == nil {
var err error
sn, _ := identity.GetSN()
r.IONRoom, err = r.CreateIonRoom(r.HostIon, sn)
if err != nil {
log.Debug("room->", sn, "create room ok", r)
return err
}
}
t := r.Localtracks[streamname]
if t == nil {
t = &LocalTrackPublication{Streamname: streamname}
t.INORoomRTCJoin(r, streamname, r.Identity)
log.Debug("ion track->", streamname, "<-is nil ,Connect room", t, r)
} else {
if !t.IONRtc.Connected() {
t.INORoomRTCJoin(r, streamname, r.Identity)
log.Debug("ion track->", streamname, "<-is nil ,re Connect room", t, r)
}
}
if t.IONSfuTrack.VideoTrack == nil && t.Videopub == nil && t.IONSfuTrack.AudioTrack == nil && t.Audiopub == nil {
videoTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, streamname+"-video", streamname)
if err != nil {
panic(err)
}
// r.RoomClient.MutePublishedTrack(r.Ctx,)
// var local_video *lksdk.LocalTrackPublication
if _, err = t.IONRtc.Publish(videoTrack); err != nil {
log.Debug("Error publishing video track->", err)
return err
}
t.IONSfuTrack.VideoTrack = videoTrack
// r.Localtracks[streamname] = &LocalTrackPublication{p: local_video, Track: videoTrack, Trackname: streamname + "-video"}
log.Debug("[TrackPublished_to_ION]", "published video track -> ", streamname)
if t.IONSfuTrack.AudioTrack == nil {
audioTrack, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, streamname+"-audio", streamname)
//audioTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus})
if err != nil {
panic(err)
}
// var local_audio *lksdk.LocalTrackPublication
if _, err = t.IONRtc.Publish(audioTrack); err != nil {
log.Debug("Error publishing audio track->", err)
return err
}
t.IONSfuTrack.AudioTrack = audioTrack
log.Debug("[TrackPublished_to_ION]", "published audio track -> ", streamname)
}
r.Localtracks[streamname] = t
} else {
log.Debug(streamname, "is exit publish")
}
return nil
}
func (r *Room) RTPTrackPublished_to_ION(trackRemote []*webrtc.TrackRemote, streamname string) error {
// - `in` implements io.ReadCloser, such as buffer or file
// - `mime` has to be one of webrtc.MimeType...
// videoTrack, err := lksdk.NewLocalSampleTrack(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264})
// r.mux.lock()
r.ionlock.Lock()
defer r.ionlock.Unlock()
if r.Ctx != nil && r.IONRoom == nil {
var err error
sn, _ := identity.GetSN()
r.IONRoom, err = r.CreateIonRoom(r.HostIon, sn)
if err != nil {
log.Debug("room->", sn, "create room ok", r)
return err
}
}
t := r.Localtracks[streamname]
if t == nil {
t = &LocalTrackPublication{Streamname: streamname}
t.INORoomRTCJoin(r, streamname, r.Identity)
log.Debug("track->", streamname, "<-is nil ,Connect room", t, r)
} else {
if !t.IONRtc.Connected() {
t.INORoomRTCJoin(r, streamname, r.Identity)
log.Debug("ion track->", streamname, "<-is nil ,re Connect room", t, r)
}
}
for _, v := range trackRemote {
if v.Kind().String() == "video" {
if t.IONSfuTrack.VideoRTPTrack == nil {
if strings.Contains(v.Codec().MimeType, "video") {
videoRTPTrack, err := webrtc.NewTrackLocalStaticRTP(v.Codec().RTPCodecCapability, streamname+"-video", streamname)
if err != nil {
panic(err)
}
var pub []*webrtc.RTPSender
log.Debug("ion track publish", videoRTPTrack)
if pub, err = t.IONRtc.Publish(videoRTPTrack); err != nil {
log.Debug("Error publishing video RTP track->", err)
return err
}
t.IONVideopub = pub[0]
t.IONSfuTrack.VideoRTPTrack = videoRTPTrack
r.Localtracks[streamname] = t
log.Debug("[RTPTrackPublished_to_ION]", "published video track -> ", streamname)
}
}
} else {
if v.Kind().String() == "audio" {
if t.IONSfuTrack.AudioRTPTrack == nil {
if strings.Contains(v.Codec().MimeType, "audio") {
audioRTPTrack, err := webrtc.NewTrackLocalStaticRTP(v.Codec().RTPCodecCapability, streamname+"-audio", streamname)
if err != nil {
panic(err)
}
// var local_audio *lksdk.LocalTrackPublication
var pub []*webrtc.RTPSender
log.Debug("ion track publish", audioRTPTrack)
if pub, err = t.IONRtc.Publish(audioRTPTrack); err != nil {
log.Debug("Error publishing audio track", err)
return err
}
t.IONAudiopub = pub[0]
t.IONSfuTrack.AudioRTPTrack = audioRTPTrack
r.Localtracks[streamname] = t
log.Debug("[RTPTrackPublished_to_ION]", "published audio track -> ", streamname)
}
}
}
}
}
// r.Localtracks[streamname] = t
// } else {
// log.Debug(streamname, "is exit publish")
// }
return nil
}
func (r *Room) TrackSendIonRtpPackets(trackname, kind string, data []byte) (n int, err error) {
if trackname == "" {
log.Debug("Track name is null")
return 0, fmt.Errorf("input trackname is null")
}
// var t *webrtc.TrackLocalStaticSample
var t *webrtc.TrackLocalStaticRTP
track := r.Localtracks[trackname]
if track == nil {
log.Debug("TrackSendIonRtpPackets:", "Track is nil ->", trackname, "<- no to publish")
return 0, fmt.Errorf(" track is null,no to publish")
}
if kind == "video" {
t = track.IONSfuTrack.VideoRTPTrack
} else if kind == "audio" {
t = track.IONSfuTrack.AudioRTPTrack
}
if t == nil {
log.Debug("TrackSendIonRtpPackets:", "t is nil ->", trackname, "<- no to publish")
return 0, fmt.Errorf(" track is null,no to publish")
}
var sb *samplebuilder.SampleBuilder
packets := &rtp.Packet{}
if err := packets.Unmarshal(data); err != nil {
return 0, err
}
sb.Push(packets)
for _, p := range sb.PopPackets() {
err = t.WriteRTP(p)
if err != nil {
log.Debug("[TrackSendIonRtpPackets] error", err)
return 0, err
}
}
//n, err = t.Write(data)
return len(data), nil
}
func (r *Room) TrackSendIonData(trackname, kind string, data []byte, duration time.Duration) error {
if trackname == "" {
log.Debug("Track name is null")
return fmt.Errorf("input trackname is null")
}
var t *webrtc.TrackLocalStaticSample
track := r.Localtracks[trackname]
if track == nil {
log.Debug("Track is nil ->", trackname, "<- no to publish")
return fmt.Errorf(" track is null,no to publish")
}
if kind == "video" {
t = track.IONSfuTrack.VideoTrack
} else if kind == "audio" {
t = track.IONSfuTrack.AudioTrack
}
if t == nil {
log.Debug("Track is nil ->", trackname, "<- no to publish")
return fmt.Errorf(" track is null,no to publish")
}
if videoErr := t.WriteSample(media.Sample{
Data: data,
Duration: duration,
}); videoErr != nil {
log.Debug("WriteSample err", videoErr)
// r.ConnectRoom()
return nil //fmt.Errorf("WriteSample err %s", vedioErr)
}
return nil
}
func (r *Room) TrackCloseION(streamname string) error {
if t := r.Localtracks[streamname]; t != nil {
var pub []*webrtc.RTPSender
if r.Localtracks[streamname].IONVideopub != nil {
pub = append(pub, r.Localtracks[streamname].IONVideopub)
}
if r.Localtracks[streamname].IONAudiopub != nil {
pub = append(pub, r.Localtracks[streamname].IONAudiopub)
}
if pub != nil {
t.IONRtc.UnPublish(pub...)
t.LiveKitRoomConnect.Disconnect()
}
r.Localtracks[streamname] = nil
log.Debug("track ", streamname, "lost ,now removed", r)
//r.RoomConnect.LocalParticipant.UnpublishTrack(r.RoomConnect.LocalParticipant.SID())
// r.Localtracks[streamname+"-video"]
}
return nil
}
原文地址:https://blog.csdn.net/superxxd/article/details/126297780
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_25322.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。