001package net.bramp.ffmpeg.progress; 002 003import static com.google.common.base.Preconditions.checkNotNull; 004 005import com.google.common.net.InetAddresses; 006import java.io.IOException; 007import java.net.InetAddress; 008import java.net.URI; 009import java.net.URISyntaxException; 010import java.util.concurrent.CountDownLatch; 011import javax.annotation.CheckReturnValue; 012 013/** Abstract base class for socket-based FFmpeg progress parsers. */ 014public abstract class AbstractSocketProgressParser implements ProgressParser { 015 016 final StreamProgressParser parser; 017 018 Thread thread; // Thread for handling incoming connections 019 020 /** Constructs a new socket progress parser with the given listener. */ 021 public AbstractSocketProgressParser(ProgressListener listener) { 022 this.parser = new StreamProgressParser(listener); 023 } 024 025 /** 026 * Creates a URL to parse to FFmpeg based on the scheme, address and port. 027 * 028 * <p>TODO Move this method to somewhere better. 029 * 030 * @param scheme The scheme to use (e.g. "tcp", "udp", "rtp", "http") 031 * @param address The address of the server 032 * @param port The port to connect to 033 * @return A URI representing the address and port 034 * @throws URISyntaxException if the URI is invalid 035 */ 036 @CheckReturnValue 037 static URI createUri(String scheme, InetAddress address, int port) throws URISyntaxException { 038 checkNotNull(address); 039 return new URI( 040 scheme, 041 null /* userInfo */, 042 InetAddresses.toUriString(address), 043 port, 044 null /* path */, 045 null /* query */, 046 null /* fragment */); 047 } 048 049 /** Returns the name for the background thread handling connections. */ 050 @CheckReturnValue 051 protected abstract String getThreadName(); 052 053 /** Returns the runnable that handles incoming progress data. */ 054 protected abstract Runnable getRunnable(CountDownLatch startSignal); 055 056 /** 057 * Starts the ProgressParser waiting for progress. 058 * 059 * @exception IllegalThreadStateException if the parser was already started. 060 */ 061 @Override 062 public synchronized void start() { 063 if (thread != null) { 064 throw new IllegalThreadStateException("Parser already started"); 065 } 066 067 String name = getThreadName() + "(" + getUri().toString() + ")"; 068 069 CountDownLatch startSignal = new CountDownLatch(1); 070 Runnable runnable = getRunnable(startSignal); 071 072 thread = new Thread(runnable, name); 073 thread.start(); 074 075 // Block until the thread has started 076 try { 077 startSignal.await(); 078 } catch (InterruptedException e) { 079 Thread.currentThread().interrupt(); 080 } 081 } 082 083 @Override 084 public void stop() throws IOException { 085 if (thread != null) { 086 thread.interrupt(); // This unblocks processStream(); 087 088 try { 089 thread.join(); 090 } catch (InterruptedException e) { 091 Thread.currentThread().interrupt(); 092 } 093 } 094 } 095 096 @Override 097 public void close() throws IOException { 098 stop(); 099 } 100}