Add observer for peer connections

The observer is just an adapter for the "PeerConnection.Observer"
provided by the WebRTC library; a custom observer is used to expose only
the events needed outside "PeerConnectionWrapper".

For now only the same events that were already handled are taken into
account, but at a later point additional events (like "onAddTrack"
instead of "onAddStream", which is deprecated) could be added too.

Note that the thread used to handle the events has changed; the EventBus
subscriber mode was "MAIN", but as the events were posted from a
PeerConnection observer, which run in a worker thread rather than in the
main thread, the subscriber was executed in the main thread rather than
in the same thread as the poster. Due to this the actions performed by
the handler now must be explicitly run in the main thread.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2022-11-22 19:24:39 +01:00
parent fcbfc1926d
commit 04f1679e2a
3 changed files with 180 additions and 73 deletions

View file

@ -63,7 +63,6 @@ import com.nextcloud.talk.application.NextcloudTalkApplication;
import com.nextcloud.talk.data.user.model.User;
import com.nextcloud.talk.databinding.CallActivityBinding;
import com.nextcloud.talk.events.ConfigurationChangeEvent;
import com.nextcloud.talk.events.MediaStreamEvent;
import com.nextcloud.talk.events.NetworkEvent;
import com.nextcloud.talk.events.PeerConnectionEvent;
import com.nextcloud.talk.events.WebSocketCommunicationEvent;
@ -269,6 +268,8 @@ public class CallActivity extends CallBaseActivity {
private Map<String, PeerConnectionWrapper.DataChannelMessageListener> dataChannelMessageListeners = new HashMap<>();
private Map<String, PeerConnectionWrapper.PeerConnectionObserver> peerConnectionObservers = new HashMap<>();
private SignalingMessageReceiver.ParticipantListMessageListener participantListMessageListener = new SignalingMessageReceiver.ParticipantListMessageListener() {
@Override
@ -1989,6 +1990,11 @@ public class CallActivity extends CallBaseActivity {
signalingMessageReceiver.addListener(offerAnswerNickProvider.getScreenWebRtcMessageListener(), sessionId, "screen");
}
PeerConnectionWrapper.PeerConnectionObserver peerConnectionObserver =
new CallActivityPeerConnectionObserver(sessionId, type);
peerConnectionObservers.put(sessionId + "-" + type, peerConnectionObserver);
peerConnectionWrapper.addObserver(peerConnectionObserver);
if (!publisher) {
runOnUiThread(() -> {
// userId is unknown here, but it will be got based on the session id, and the stream will be
@ -2031,6 +2037,9 @@ public class CallActivity extends CallBaseActivity {
}
String videoStreamType = peerConnectionWrapper.getVideoStreamType();
if (VIDEO_STREAM_TYPE_SCREEN.equals(videoStreamType) || !justScreen) {
PeerConnectionWrapper.PeerConnectionObserver peerConnectionObserver = peerConnectionObservers.remove(sessionId + "-" + videoStreamType);
peerConnectionWrapper.removeObserver(peerConnectionObserver);
runOnUiThread(() -> removeMediaStream(sessionId, videoStreamType));
deletePeerConnection(peerConnectionWrapper);
}
@ -2116,24 +2125,7 @@ public class CallActivity extends CallBaseActivity {
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(PeerConnectionEvent peerConnectionEvent) {
String sessionId = peerConnectionEvent.getSessionId();
if (peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_CONNECTED ||
peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_COMPLETED) {
handlePeerConnected(sessionId, peerConnectionEvent.getVideoStreamType());
} else if (peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_DISCONNECTED ||
peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_NEW ||
peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_CHECKING) {
handlePeerDisconnected(sessionId, peerConnectionEvent.getVideoStreamType());
} else if (peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_CLOSED) {
endPeerConnection(sessionId, VIDEO_STREAM_TYPE_SCREEN.equals(peerConnectionEvent.getVideoStreamType()));
} else if (peerConnectionEvent.getPeerConnectionEventType() ==
if (peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.SENSOR_FAR ||
peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.SENSOR_NEAR) {
@ -2147,15 +2139,6 @@ public class CallActivity extends CallBaseActivity {
toggleMedia(enableVideo, true);
}
}
} else if (peerConnectionEvent.getPeerConnectionEventType() ==
PeerConnectionEvent.PeerConnectionEventType.PEER_FAILED) {
if (webSocketClient != null && webSocketClient.getSessionId() != null && webSocketClient.getSessionId().equals(sessionId)) {
setCallState(CallStatus.PUBLISHER_FAILED);
webSocketClient.clearResumeId();
hangup(false);
} else {
handlePeerDisconnected(sessionId, peerConnectionEvent.getVideoStreamType());
}
}
}
@ -2221,25 +2204,6 @@ public class CallActivity extends CallBaseActivity {
}
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MediaStreamEvent mediaStreamEvent) {
String participantDisplayItemId = mediaStreamEvent.getSession() + "-" + mediaStreamEvent.getVideoStreamType();
if (participantDisplayItems.get(participantDisplayItemId) == null) {
return;
}
boolean hasAtLeastOneVideoStream = false;
if (mediaStreamEvent.getMediaStream() != null) {
hasAtLeastOneVideoStream = mediaStreamEvent.getMediaStream().videoTracks != null
&& mediaStreamEvent.getMediaStream().videoTracks.size() > 0;
}
ParticipantDisplayItem participantDisplayItem = participantDisplayItems.get(participantDisplayItemId);
participantDisplayItem.setMediaStream(mediaStreamEvent.getMediaStream());
participantDisplayItem.setStreamEnabled(hasAtLeastOneVideoStream);
participantsAdapter.notifyDataSetChanged();
}
@Override
public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions,
@NonNull int[] grantResults) {
@ -2694,6 +2658,71 @@ public class CallActivity extends CallBaseActivity {
}
}
private class CallActivityPeerConnectionObserver implements PeerConnectionWrapper.PeerConnectionObserver {
private final String sessionId;
private final String videoStreamType;
private final String participantDisplayItemId;
private CallActivityPeerConnectionObserver(String sessionId, String videoStreamType) {
this.sessionId = sessionId;
this.videoStreamType = videoStreamType;
this.participantDisplayItemId = sessionId + "-" + videoStreamType;
}
@Override
public void onStreamAdded(MediaStream mediaStream) {
handleStream(mediaStream);
}
@Override
public void onStreamRemoved(MediaStream mediaStream) {
handleStream(null);
}
private void handleStream(MediaStream mediaStream) {
runOnUiThread(() -> {
if (participantDisplayItems.get(participantDisplayItemId) == null) {
return;
}
boolean hasAtLeastOneVideoStream = false;
if (mediaStream != null) {
hasAtLeastOneVideoStream = mediaStream.videoTracks != null && mediaStream.videoTracks.size() > 0;
}
ParticipantDisplayItem participantDisplayItem = participantDisplayItems.get(participantDisplayItemId);
participantDisplayItem.setMediaStream(mediaStream);
participantDisplayItem.setStreamEnabled(hasAtLeastOneVideoStream);
participantsAdapter.notifyDataSetChanged();
});
}
@Override
public void onIceConnectionStateChanged(PeerConnection.IceConnectionState iceConnectionState) {
runOnUiThread(() -> {
if (iceConnectionState == PeerConnection.IceConnectionState.CONNECTED ||
iceConnectionState == PeerConnection.IceConnectionState.COMPLETED) {
handlePeerConnected(sessionId, videoStreamType);
} else if (iceConnectionState == PeerConnection.IceConnectionState.DISCONNECTED ||
iceConnectionState == PeerConnection.IceConnectionState.NEW ||
iceConnectionState == PeerConnection.IceConnectionState.CHECKING) {
handlePeerDisconnected(sessionId, videoStreamType);
} else if (iceConnectionState == PeerConnection.IceConnectionState.CLOSED) {
endPeerConnection(sessionId, VIDEO_STREAM_TYPE_SCREEN.equals(videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.FAILED) {
if (webSocketClient != null && webSocketClient.getSessionId() != null && webSocketClient.getSessionId().equals(sessionId)) {
setCallState(CallStatus.PUBLISHER_FAILED);
webSocketClient.clearResumeId();
hangup(false);
} else {
handlePeerDisconnected(sessionId, videoStreamType);
}
}
});
}
}
private class InternalSignalingMessageSender implements SignalingMessageSender {
@Override

View file

@ -0,0 +1,68 @@
/*
* Nextcloud Talk application
*
* @author Daniel Calviño Sánchez
* Copyright (C) 2022 Daniel Calviño Sánchez <danxuliu@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.nextcloud.talk.webrtc;
import org.webrtc.MediaStream;
import org.webrtc.PeerConnection;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* Helper class to register and notify PeerConnectionObserver.
*
* This class is only meant for internal use by PeerConnectionWrapper; observers must register themselves against
* a PeerConnectionWrapper rather than against a PeerConnectionNotifier.
*/
public class PeerConnectionNotifier {
private final Set<PeerConnectionWrapper.PeerConnectionObserver> peerConnectionObservers = new LinkedHashSet<>();
public synchronized void addObserver(PeerConnectionWrapper.PeerConnectionObserver observer) {
if (observer == null) {
throw new IllegalArgumentException("PeerConnectionObserver can not be null");
}
peerConnectionObservers.add(observer);
}
public synchronized void removeObserver(PeerConnectionWrapper.PeerConnectionObserver observer) {
peerConnectionObservers.remove(observer);
}
public synchronized void notifyStreamAdded(MediaStream stream) {
for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) {
observer.onStreamAdded(stream);
}
}
public synchronized void notifyStreamRemoved(MediaStream stream) {
for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) {
observer.onStreamRemoved(stream);
}
}
public synchronized void notifyIceConnectionStateChanged(PeerConnection.IceConnectionState state) {
for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) {
observer.onIceConnectionStateChanged(state);
}
}
}

View file

@ -28,8 +28,6 @@ import android.util.Log;
import com.bluelinelabs.logansquare.LoganSquare;
import com.nextcloud.talk.application.NextcloudTalkApplication;
import com.nextcloud.talk.events.MediaStreamEvent;
import com.nextcloud.talk.events.PeerConnectionEvent;
import com.nextcloud.talk.models.json.signaling.DataChannelMessage;
import com.nextcloud.talk.models.json.signaling.NCIceCandidate;
import com.nextcloud.talk.models.json.signaling.NCMessagePayload;
@ -37,7 +35,6 @@ import com.nextcloud.talk.models.json.signaling.NCSignalingMessage;
import com.nextcloud.talk.signaling.SignalingMessageReceiver;
import com.nextcloud.talk.signaling.SignalingMessageSender;
import org.greenrobot.eventbus.EventBus;
import org.webrtc.AudioTrack;
import org.webrtc.DataChannel;
import org.webrtc.IceCandidate;
@ -85,6 +82,21 @@ public class PeerConnectionWrapper {
void onNickChanged(String nick);
}
/**
* Observer for changes on the peer connection.
*
* The changes are bound to a specific peer connection, so each observer is expected to handle messages only for
* a single peer connection.
*
* All methods are called on the so called "signaling" thread of WebRTC, which is an internal thread created by the
* WebRTC library and NOT the same thread where signaling messages are received.
*/
public interface PeerConnectionObserver {
void onStreamAdded(MediaStream mediaStream);
void onStreamRemoved(MediaStream mediaStream);
void onIceConnectionStateChanged(PeerConnection.IceConnectionState iceConnectionState);
}
private static final String TAG = PeerConnectionWrapper.class.getCanonicalName();
private final SignalingMessageReceiver signalingMessageReceiver;
@ -94,6 +106,8 @@ public class PeerConnectionWrapper {
private final DataChannelMessageNotifier dataChannelMessageNotifier = new DataChannelMessageNotifier();
private final PeerConnectionNotifier peerConnectionNotifier = new PeerConnectionNotifier();
private List<IceCandidate> iceCandidates = new ArrayList<>();
private PeerConnection peerConnection;
private String sessionId;
@ -186,6 +200,21 @@ public class PeerConnectionWrapper {
dataChannelMessageNotifier.removeListener(listener);
}
/**
* Adds an observer for peer connection changes.
*
* An observer is expected to be added only once. If the same observer is added again it will be notified just once.
*
* @param observer the PeerConnectionObserver
*/
public void addObserver(PeerConnectionObserver observer) {
peerConnectionNotifier.addObserver(observer);
}
public void removeObserver(PeerConnectionObserver observer) {
peerConnectionNotifier.removeObserver(observer);
}
public String getVideoStreamType() {
return videoStreamType;
}
@ -412,31 +441,12 @@ public class PeerConnectionWrapper {
Log.d("iceConnectionChangeTo: ", iceConnectionState.name() + " over " + peerConnection.hashCode() + " " + sessionId);
if (iceConnectionState == PeerConnection.IceConnectionState.CONNECTED) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_CONNECTED,
sessionId, null, null, videoStreamType));
if (hasInitiated) {
sendInitialMediaStatus();
}
} else if (iceConnectionState == PeerConnection.IceConnectionState.COMPLETED) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_COMPLETED,
sessionId, null, null, videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.CLOSED) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType
.PEER_CLOSED, sessionId, null, null, videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.DISCONNECTED) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_DISCONNECTED,
sessionId, null, null, videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.NEW) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_NEW,
sessionId, null, null, videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.CHECKING) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_CHECKING,
sessionId, null, null, videoStreamType));
} else if (iceConnectionState == PeerConnection.IceConnectionState.FAILED) {
EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_FAILED,
sessionId, null, null, videoStreamType));
}
peerConnectionNotifier.notifyIceConnectionStateChanged(iceConnectionState);
}
@Override
@ -473,12 +483,12 @@ public class PeerConnectionWrapper {
@Override
public void onAddStream(MediaStream mediaStream) {
EventBus.getDefault().post(new MediaStreamEvent(mediaStream, sessionId, videoStreamType));
peerConnectionNotifier.notifyStreamAdded(mediaStream);
}
@Override
public void onRemoveStream(MediaStream mediaStream) {
EventBus.getDefault().post(new MediaStreamEvent(null, sessionId, videoStreamType));
peerConnectionNotifier.notifyStreamRemoved(mediaStream);
}
@Override