// :gcc -o recv_dec_play recv_dec_play.c -lspeex -logg -lasound -lortp
/*************************************************************************
* Filename: recv_dec_play.c
* Author: Gongguan
* Description: RTP ogg ,
* Version: Trial
* Date: 2012-03-16
* History: none
*************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <ortp/ortp.h>
#include <signal.h>
#include <speex/speex.h>
#include <ogg/ogg.h>
#include <math.h>
#include <string.h>
#include <speex/speex_header.h>
#include <speex/speex_stereo.h>
#include <speex/speex_callbacks.h>
#include <alsa/asoundlib.h>
#include <assert.h>
#include <termios.h>
#include <sys/poll.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <sys/signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <endian.h>
#define DEC_FRAME_SIZE 640
static const SpeexMode *mode;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t recv_over = PTHREAD_COND_INITIALIZER;
static pthread_cond_t play_over = PTHREAD_COND_INITIALIZER;
static pthread_cond_t ready_recv = PTHREAD_COND_INITIALIZER;
static pthread_cond_t ready_play = PTHREAD_COND_INITIALIZER;
typedef void* (*fun)(void*);
void* receive(void*);
void* decode(void*);
void* play(void*);
pthread_t tdec, tplay, trecv;
void stop_handler(int signum)
{
exit(1);
}
void ssrc_cb(RtpSession *session)
{
printf("hey, the ssrc has changed !
");
}
static u_char *recv_buf = NULL;
static int recv_size = 0;
#define MULAW 0
#define ALAW 1
#define PORT 5000
#define RECV_SIZE 160
/*
* Now, for player
*/
#define DEFAULT_FORMAT SND_PCM_FORMAT_S16_LE
#define DEFAULT_SPEED 8000
#define CHANNELS 2
#define PLAYBACK_TIME 3
#define PERIOD_FRAMES 160
static snd_pcm_sframes_t (*writei_func)(snd_pcm_t *handle, const void *buffer, snd_pcm_uframes_t size);
static snd_pcm_t *handle;
static struct {
snd_pcm_format_t format;
unsigned int channels;
unsigned int rate;
} hwparams;
static int quiet_mode = 0;//printf information
static snd_pcm_stream_t stream = SND_PCM_STREAM_PLAYBACK;
static int nonblock = 0;
static int can_pause = 0;
static short *audio_buf = NULL;
static snd_pcm_uframes_t chunk_size = PERIOD_FRAMES;
static unsigned period_time = 0;
static unsigned buffer_time = 0;
static snd_pcm_uframes_t period_frames = PERIOD_FRAMES;
static snd_pcm_uframes_t buffer_frames = 0;
static int avail_min = -1;
static int start_delay = 0;
static int stop_delay = 0;
static int monotonic = 0;
static size_t bits_per_sample, bits_per_frame;
static size_t chunk_bytes;
static int test_nowait = 0;
volatile static int recycle_capture_file = 0;
static void playback(char *filename);
#if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 95)
#define error(...) do {\
fprintf(stderr, "%s:%d: ", __FUNCTION__, __LINE__); \
fprintf(stderr, __VA_ARGS__); \
putc('
', stderr); \
} while (0)
#else
#define error(args...) do {\
fprintf(stderr, "%s:%d: ", __FUNCTION__, __LINE__); \
fprintf(stderr, ##args); \
putc('
', stderr); \
} while (0)
#endif
/*
* End player
*/
/***************************************************************************
* receive the ogg audio stream from network by RTP protocal
***************************************************************************/
void* receive(void* arg)
{
RtpSession *session;
u_char buffer[RECV_SIZE];
int err;
uint32_t ts=0;
int stream_received=0;
int local_port = PORT;
int have_more;
int i;
int format=0;
int jittcomp=40;
bool_t adapt=TRUE;
ortp_init();
ortp_scheduler_init();
ortp_set_log_level_mask(ORTP_DEBUG|ORTP_MESSAGE|ORTP_WARNING|ORTP_ERROR);
signal(SIGINT,stop_handler);
session=rtp_session_new(RTP_SESSION_RECVONLY);
rtp_session_set_scheduling_mode(session,1);
rtp_session_set_blocking_mode(session,1);
rtp_session_set_local_addr(session,"0.0.0.0",PORT);
rtp_session_set_connected_mode(session,TRUE);
rtp_session_set_symmetric_rtp(session,TRUE);
rtp_session_enable_adaptive_jitter_compensation(session,adapt);
rtp_session_set_jitter_compensation(session,jittcomp);
rtp_session_set_payload_type(session,0); //mulaw
rtp_session_signal_connect(session,"ssrc_changed",(RtpCallback)ssrc_cb,0);
rtp_session_signal_connect(session,"ssrc_changed",(RtpCallback)rtp_session_reset,0);
sleep(1); //sleep for a while to make sure decode thread can receive the signal
pthread_mutex_lock(&mutex);
while(1)
{
have_more=1;
while (have_more)
{
recv_size = rtp_session_recv_with_ts(session,buffer,RECV_SIZE,ts,&have_more);
if (recv_size > 0)
stream_received = 1;
/* this is to avoid to write to disk some silence before the first RTP packet is returned */
if ((stream_received) && (recv_size > 0))
{
for (i = 0; i < recv_size; i++)
recv_buf[i] = buffer[i];
pthread_cond_signal(&recv_over); //send signal to decoder to ready for decode
//printf("Receiver : Sending signal to decoder ...
");
pthread_cond_wait(&ready_recv, &mutex); //waitting the signal from decoder to receive again
//printf("Receiver : Roger that!
");
}
}
ts += RECV_SIZE;
}
pthread_mutex_unlock(&mutex);
pthread_exit(0);
}//end of receive()
/*
* process the ogg audio header
*/
static void *process_header(ogg_packet *op, spx_int32_t enh_enabled, spx_int32_t *frame_size, int *granule_frame_size, spx_int32_t *rate, int *nframes, int forceMode, int *channels, SpeexStereoState *stereo, int *extra_headers, int quiet)
{
void *st;
SpeexHeader *header;
int modeID;
SpeexCallback callback;
header = speex_packet_to_header((char*)op->packet, op->bytes);
if (!header)
{
fprintf (stderr, "Cannot read header
");
return NULL;
}
if (header->mode >= SPEEX_NB_MODES || header->mode<0)
{
fprintf (stderr, "Mode number %d does not (yet/any longer) exist in this version
",
header->mode);
free(header);
return NULL;
}
modeID = header->mode;
if (forceMode!=-1)
modeID = forceMode;
mode = speex_lib_get_mode (modeID);
if (header->speex_version_id > 1)
{
fprintf (stderr, "This file was encoded with Speex bit-stream version %d, which I don't know how to decode
", header->speex_version_id);
free(header);
return NULL;
}
if (mode->bitstream_version < header->mode_bitstream_version)
{
fprintf (stderr, "The file was encoded with a newer version of Speex. You need to upgrade in order to play it.
");
free(header);
return NULL;
}
if (mode->bitstream_version > header->mode_bitstream_version)
{
fprintf (stderr, "The file was encoded with an older version of Speex. You would need to downgrade the version in order to play it.
");
free(header);
return NULL;
}
st = speex_decoder_init(mode);
if (!st)
{
fprintf (stderr, "Decoder initialization failed.
");
free(header);
return NULL;
}
speex_decoder_ctl(st, SPEEX_SET_ENH, &enh_enabled);
speex_decoder_ctl(st, SPEEX_GET_FRAME_SIZE, frame_size);
*granule_frame_size = *frame_size;
if (!*rate)
*rate = header->rate;
/* Adjust rate if --force-* options are used */
if (forceMode!=-1)
{
if (header->mode < forceMode)
{
*rate <<= (forceMode - header->mode);
*granule_frame_size >>= (forceMode - header->mode);
}
if (header->mode > forceMode)
{
*rate >>= (header->mode - forceMode);
*granule_frame_size <<= (header->mode - forceMode);
}
}
speex_decoder_ctl(st, SPEEX_SET_SAMPLING_RATE, rate);
*nframes = header->frames_per_packet;
if (*channels==-1)
*channels = header->nb_channels;
if (!(*channels==1))
{
*channels = 2;
callback.callback_id = SPEEX_INBAND_STEREO;
callback.func = speex_std_stereo_request_handler;
callback.data = stereo;
speex_decoder_ctl(st, SPEEX_SET_HANDLER, &callback);
}
if (!quiet)
{
fprintf (stderr, "Decoding %d Hz audio using %s mode",
*rate, mode->modeName);
if (*channels==1)
fprintf (stderr, " (mono");
else
fprintf (stderr, " (stereo");
if (header->vbr)
fprintf (stderr, ", VBR)
");
else
fprintf(stderr, ")
");
/*fprintf (stderr, "Decoding %d Hz audio at %d bps using %s mode
",
*rate, mode->bitrate, mode->modeName);*/
}
*extra_headers = header->extra_headers;
free(header);
return st;
}//end of process_header()
/************************************************************************************
* decode the received audio from the receiver and give it to player after decode it
*************************************************************************************/
void* decode(void* arg)
{
int c;
int option_index = 0;
short out[DEC_FRAME_SIZE];
short output[DEC_FRAME_SIZE];
int frame_size=0, granule_frame_size=0;
void *st=NULL;
SpeexBits bits;
int packet_count=0;
int stream_init = 0;
int quiet = 0;
ogg_int64_t page_granule=0, last_granule=0;
int skip_samples=0, page_nb_packets;
ogg_sync_state oy;
ogg_page og;
ogg_packet op;
ogg_stream_state os;
int enh_enabled = 1;
int nframes=2;
int print_bitrate=0;
int close_in=0;
int eos=0;
int forceMode=-1;
float loss_percent=-1;
SpeexStereoState stereo = SPEEX_STEREO_STATE_INIT;
int channels=-1;
int rate=0;
int extra_headers=0;
int lookahead;
int speex_serialno = -1;
int i, j, recv_loop,dec_loop;
/* Init Ogg data struct */
ogg_sync_init(&oy);
speex_bits_init(&bits);
/* Main decoding loop */
while (1)
{
u_char *data;
/*Get the ogg buffer for writing*/
data = ogg_sync_buffer(&oy, 160);
pthread_cond_wait(&recv_over, &mutex); //wait the signal from decoder
//printf("Decoder : Roger that!
");
/*Read bitstream from input file*/
for (recv_loop = 0; recv_loop < recv_size; recv_loop++)
{
data[recv_loop] = recv_buf[recv_loop];
}
ogg_sync_wrote(&oy, recv_size);
/*Loop for all complete pages we got (most likely only one)*/
while (ogg_sync_pageout(&oy, &og) == 1)
{
int packet_no;
if (stream_init == 0)
{
ogg_stream_init(&os, ogg_page_serialno(&og));
stream_init = 1;
}
if (ogg_page_serialno(&og) != os.serialno)
ogg_stream_reset_serialno(&os, ogg_page_serialno(&og)); /* so all streams are read. */
/*Add page to the bitstream*/
ogg_stream_pagein(&os, &og);
page_granule = ogg_page_granulepos(&og);
page_nb_packets = ogg_page_packets(&og);
if (page_granule>0 && frame_size)
{
/* FIXME: shift the granule values if --force-* is specified */
skip_samples = frame_size*(page_nb_packets*granule_frame_size*nframes - (page_granule-last_granule))/granule_frame_size;
if (ogg_page_eos(&og))
skip_samples = -skip_samples;
} else /*else if (!ogg_page_bos(&og)) skip_samples = 0;*/
{
skip_samples = 0;
}
//printf ("page granulepos: %d %d %d
", skip_samples, page_nb_packets, (int)page_granule);
last_granule = page_granule;
/*Extract all available packets*/
packet_no=0;
while (!eos && ogg_stream_packetout(&os, &op) == 1)
{
if (op.bytes>=5 && !memcmp(op.packet, "Speex", 5))
{
speex_serialno = os.serialno;
}
if (speex_serialno == -1 || os.serialno != speex_serialno)
break;
if (packet_count==0) /*If first packet, process as Speex header*/
{
st = process_header(&op, enh_enabled, &frame_size, &granule_frame_size, &rate, &nframes, forceMode, &channels, &stereo, &extra_headers, quiet);
if (!st)
exit(1);
speex_decoder_ctl(st, SPEEX_GET_LOOKAHEAD, &lookahead);
if (!nframes)
nframes=1;
}else if(packet_count<=1+extra_headers)
{
/* Ignore extra headers.Don't delete it, or you will find a notification! */
}
else
{
int lost=0;
packet_no++;
if (loss_percent>0 && 100*((float)rand())/RAND_MAX<loss_percent)
lost=1;
/*End of stream condition*/
if (op.e_o_s && os.serialno == speex_serialno) /* don't care for anything except speex eos */
eos=1;
/*Copy Ogg packet to Speex bitstream*/
speex_bits_read_from(&bits, (char*)op.packet, op.bytes);
for (j = 0; j != nframes; j++)
{
int ret;
/*Decode frame*/
if (!lost)
ret = speex_decode_int(st, &bits, output);
else
ret = speex_decode_int(st, NULL, output);
if (ret==-1)
break;
if (ret==-2)
{
fprintf (stderr, "Decoding error: corrupted stream?
");
break;
}
if (speex_bits_remaining(&bits)<0)
{
fprintf (stderr, "Decoding overflow: corrupted stream?
");
break;
}
if (channels==2)
speex_decode_stereo_int(output, frame_size, &stereo);
if (print_bitrate)
{
spx_int32_t tmp;
char ch=13;
speex_decoder_ctl(st, SPEEX_GET_BITRATE, &tmp);
fputc (ch, stderr);
fprintf (stderr, "Bitrate is use: %d bps ", tmp);
}
for (i = 0; i < frame_size*channels; i++)
out[i]=output[i];
int frame_offset = 0;
int new_frame_size = frame_size;
//printf ("packet %d %d
", packet_no, skip_samples);
//fprintf (stderr, "packet %d %d %d
", packet_no, skip_samples, lookahead);
if (packet_no==1 && j==0 && skip_samples > 0)
{
//printf ("chopping first packet
");
new_frame_size -= skip_samples+lookahead;
frame_offset = skip_samples+lookahead;
}
if (packet_no == page_nb_packets && skip_samples < 0)
{
int packet_length = nframes*frame_size+skip_samples+lookahead;
new_frame_size = packet_length - j*frame_size;
if (new_frame_size<0)
new_frame_size = 0;
if (new_frame_size>frame_size)
new_frame_size = frame_size;
//printf ("chopping end: %d %d %d
", new_frame_size, packet_length, packet_no);
}
if (new_frame_size > 0)
{
for (j = 0,dec_loop=0;(dec_loop<DEC_FRAME_SIZE/sizeof(short))&&(j<new_frame_size*channels);dec_loop++,j++)
audio_buf[dec_loop] = out[j + frame_offset*channels];
pthread_cond_signal(&ready_play); //inform player ready to play
//printf("Decoder : Sending signal to player ...
");
pthread_cond_wait(&play_over, &mutex); //wait the signal from player
//audio_size += sizeof(short)*new_frame_size*channels;
}
}//end of for (j = 0; j != nframes; j++)
}//end of else
packet_count++;
}//end of while (!eos && ogg_stream_packetout(&os, &op) == 1)
}//end of while (ogg_sync_pageout(&oy, &og) == 1)
pthread_cond_signal(&ready_recv);
//printf("Decoder : Sending signal to receiver ...
");
}//end of while (1)
if (st)
speex_decoder_destroy(st);
else
fprintf (stderr, "This doesn't look like a Speex file
");
speex_bits_destroy(&bits);
if (stream_init)
ogg_stream_clear(&os);
ogg_sync_clear(&oy);
pthread_exit(0);
}//end of decode()
/*
* Subroutine to clean up before exit.
*/
static void prg_exit(int code)
{
if (handle)
snd_pcm_close(handle);
exit(code);
}
/*
* deal with signal
*/
static void signal_handler(int sig)
{
static int in_aborting;
if (in_aborting)
return;
in_aborting = 1;
if (!quiet_mode)
fprintf(stderr, "Aborted by signal %s...
", strsignal(sig));
if (handle && sig != SIGABRT) {
snd_pcm_close(handle);
handle = NULL;
}
prg_exit(EXIT_FAILURE);
}//end of signal_handler()
/*
* call on SIGUSR1 signal
*/
static void signal_handler_recycle (int sig)
{
/* flag the capture loop to start a new output file */
recycle_capture_file = 1;
}
/*
* set hardware & software parameters
*/
static void set_params(void)
{
snd_pcm_hw_params_t *params;
snd_pcm_sw_params_t *swparams;
snd_pcm_uframes_t buffer_size;
int err;
size_t n;
unsigned int rate;
snd_pcm_uframes_t start_threshold, stop_threshold;
snd_pcm_hw_params_alloca(¶ms);
snd_pcm_sw_params_alloca(&swparams);
err = snd_pcm_hw_params_any(handle, params);
if (err < 0) {
error("Broken configuration for this PCM: no configurations available");
prg_exit(EXIT_FAILURE);
}
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
if (err < 0) {
error("Access type not available");
prg_exit(EXIT_FAILURE);
}
err = snd_pcm_hw_params_set_format(handle, params, hwparams.format);
if (err < 0) {
error("Sample format non available");
prg_exit(EXIT_FAILURE);
}
err = snd_pcm_hw_params_set_channels(handle, params, hwparams.channels);
if (err < 0) {
error("Channels count non available");
prg_exit(EXIT_FAILURE);
}
rate = hwparams.rate;
err = snd_pcm_hw_params_set_rate_near(handle, params, &hwparams.rate, 0);
assert(err >= 0);
if ((float)rate * 1.05 < hwparams.rate || (float)rate * 0.95 > hwparams.rate) {
if (!quiet_mode) {
char plugex[64];
const char *pcmname = snd_pcm_name(handle);
fprintf(stderr, "Warning: rate is not accurate (requested = %iHz, got = %iHz)
", rate, hwparams.rate);
if (! pcmname || strchr(snd_pcm_name(handle), ':'))
*plugex = 0;
else
snprintf(plugex, sizeof(plugex), "(-Dplug:%s)",
snd_pcm_name(handle));
fprintf(stderr," please, try the plug plugin %s
",
plugex);
}
}
rate = hwparams.rate;
if (buffer_time == 0 && buffer_frames == 0) {
err = snd_pcm_hw_params_get_buffer_time_max(params, &buffer_time, 0);
assert(err >= 0);
if (buffer_time > 500000)
buffer_time = 500000;
}
if (period_time == 0 && period_frames == 0) {
if (buffer_time > 0)
period_time = buffer_time / 4;
else
period_frames = buffer_frames / 4;
}
if (period_time > 0)
err = snd_pcm_hw_params_set_period_time_near(handle, params, &period_time, 0);
else
err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_frames, 0);
assert(err >= 0);
if (buffer_time > 0) {
err = snd_pcm_hw_params_set_buffer_time_near(handle, params, &buffer_time, 0);
} else {
err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_frames);
}
assert(err >= 0);
monotonic = snd_pcm_hw_params_is_monotonic(params);
can_pause = snd_pcm_hw_params_can_pause(params);
err = snd_pcm_hw_params(handle, params);
if (err < 0) {
error("Unable to install hw params:");
prg_exit(EXIT_FAILURE);
}
snd_pcm_hw_params_get_period_size(params, &chunk_size, 0);
snd_pcm_hw_params_get_buffer_size(params, &buffer_size);
if (chunk_size == buffer_size) {
error("Can't use period equal to buffer size (%lu == %lu)",
chunk_size, buffer_size);
prg_exit(EXIT_FAILURE);
}
snd_pcm_sw_params_current(handle, swparams);
if (avail_min < 0)
n = chunk_size;
else
n = (double) rate * avail_min / 1000000;
err = snd_pcm_sw_params_set_avail_min(handle, swparams, n);
/* round up to closest transfer boundary */
n = buffer_size;
if (start_delay <= 0) {
start_threshold = n + (double) rate * start_delay / 1000000;
} else
start_threshold = (double) rate * start_delay / 1000000;
if (start_threshold < 1)
start_threshold = 1;
if (start_threshold > n)
start_threshold = n;
err = snd_pcm_sw_params_set_start_threshold(handle, swparams, start_threshold);
assert(err >= 0);
if (stop_delay <= 0)
stop_threshold = buffer_size + (double) rate * stop_delay / 1000000;
else
stop_threshold = (double) rate * stop_delay / 1000000;
err = snd_pcm_sw_params_set_stop_threshold(handle, swparams, stop_threshold);
assert(err >= 0);
if (snd_pcm_sw_params(handle, swparams) < 0) {
error("unable to install sw params:");
prg_exit(EXIT_FAILURE);
}
bits_per_sample = snd_pcm_format_physical_width(hwparams.format);
bits_per_frame = bits_per_sample * hwparams.channels;
chunk_bytes = chunk_size * bits_per_frame / 8;
}//end of set_params()
static void do_pause(void)
{
int err;
unsigned char b;
if (!can_pause) {
fprintf(stderr, "\rPAUSE command ignored (no hw support)
");
return;
}
err = snd_pcm_pause(handle, 1);
if (err < 0) {
error("pause push error: %s", snd_strerror(err));
return;
}
while (1) {
while (read(fileno(stdin), &b, 1) != 1);
if (b == ' ' || b == '\r') {
while (read(fileno(stdin), &b, 1) == 1);
err = snd_pcm_pause(handle, 0);
if (err < 0)
error("pause release error: %s", snd_strerror(err));
return;
}
}
}
/* It's for xrun() */
#ifndef timersub
#define timersub(a, b, result) \
do { \
(result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
(result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
if ((result)->tv_usec < 0) { \
--(result)->tv_sec; \
(result)->tv_usec += 1000000; \
} \
} while (0)
#endif
/*
* I/O error handler
*/
static void xrun(void)
{
snd_pcm_status_t *status;
int res;
snd_pcm_status_alloca(&status);
if ((res = snd_pcm_status(handle, status))<0) {
error("status error: %s", snd_strerror(res));
prg_exit(EXIT_FAILURE);
}
if (snd_pcm_status_get_state(status) == SND_PCM_STATE_XRUN) {
if (monotonic) {
fprintf(stderr, "%s !!!
", "underrun");
} else {
struct timeval now, diff, tstamp;
gettimeofday(&now, 0);
snd_pcm_status_get_trigger_tstamp(status, &tstamp);
timersub(&now, &tstamp, &diff);
fprintf(stderr, "%s!!! (at least %.3f ms long)
",
stream == SND_PCM_STREAM_PLAYBACK ? "underrun" : "overrun",
diff.tv_sec * 1000 + diff.tv_usec / 1000.0);
}
if ((res = snd_pcm_prepare(handle))<0) {
error("xrun: prepare error: %s", snd_strerror(res));
prg_exit(EXIT_FAILURE);
}
return; /* ok, data should be accepted again */
} if (snd_pcm_status_get_state(status) == SND_PCM_STATE_DRAINING) {
if (stream == SND_PCM_STREAM_CAPTURE) {
fprintf(stderr, "capture stream format change? attempting recover...
");
if ((res = snd_pcm_prepare(handle))<0) {
error("xrun(DRAINING): prepare error: %s", snd_strerror(res));
prg_exit(EXIT_FAILURE);
}
return;
}
}
error("read/write error, state = %s", snd_pcm_state_name(snd_pcm_status_get_state(status)));
prg_exit(EXIT_FAILURE);
}//end of xrun()
/*
* I/O suspend handler
*/
static void suspend(void)
{
int res;
if (!quiet_mode)
fprintf(stderr, "Suspended. Trying resume. "); fflush(stderr);
while ((res = snd_pcm_resume(handle)) == -EAGAIN)
sleep(1); /* wait until suspend flag is released */
if (res < 0) {
if (!quiet_mode)
fprintf(stderr, "Failed. Restarting stream. "); fflush(stderr);
if ((res = snd_pcm_prepare(handle)) < 0) {
error("suspend: prepare error: %s", snd_strerror(res));
prg_exit(EXIT_FAILURE);
}
}
if (!quiet_mode)
fprintf(stderr, "Done.
");
}
/*
* write function
*/
static ssize_t pcm_write(short *data, size_t count)
{
ssize_t r;
ssize_t result = 0;
int i = 1;
while (count > 0)
{
r = writei_func(handle, data, count);
if (r == -EAGAIN || (r >= 0 && (size_t)r < count))
{
if (!test_nowait)
snd_pcm_wait(handle, 100);
} else if (r == -EPIPE) {
xrun();
} else if (r == -ESTRPIPE) {
suspend();
} else if (r < 0) {
error("write error: %s", snd_strerror(r));
prg_exit(EXIT_FAILURE);
}
if (r > 0)
{
result += r;
count -= r;
data += r * bits_per_frame / 8;
}
}
return result;
}//end of pcm_write()
/***********************************************************************************
* play the audio stream after decode it
***********************************************************************************/
void* play(void* arg)
{
char *pcm_name = "default";
unsigned int err;
stream = SND_PCM_STREAM_PLAYBACK;
hwparams.format = DEFAULT_FORMAT;
hwparams.rate = DEFAULT_SPEED;
hwparams.channels = CHANNELS;
/* open sound device */
err = snd_pcm_open(&handle, pcm_name, stream, 0);
if (err < 0)
{
perror("Audio open error");
exit(1);
}
writei_func = snd_pcm_writei;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
signal(SIGABRT, signal_handler);
signal(SIGUSR1, signal_handler_recycle);
set_params();
while(1)
{
pthread_cond_wait(&ready_play, &mutex); //wait the signal from decoder
//printf(" Player : Roger that!
");
pcm_write(audio_buf, PERIOD_FRAMES);
pthread_cond_signal(&play_over);
}
pthread_mutex_unlock(&mutex);
snd_pcm_nonblock(handle, 0);
snd_pcm_drain(handle);
snd_pcm_nonblock(handle, nonblock);
snd_pcm_close(handle);
pthread_exit(0);
}//end of play()
/*************************************************************************
* receive-->decode-->playback
*************************************************************************/
int main(int argc, char** argv)
{
/* receive the audio stream from network and give it to decoder */
recv_buf = (u_char *)malloc(160);
if (NULL == recv_buf)
printf("Malloc recv_buf
");
/* play the audio stream after the decoder deal with it, it's just one period size */
audio_buf = (short *)malloc(PERIOD_FRAMES*4);
if (NULL == audio_buf)
printf("Malloc audio_buf
");
int t1 = 0, t2 = 0, t3 = 0;
/* First, receive the audio stream from network */
t1 = pthread_create(&trecv, NULL, receive, NULL);
if(t1 != 0)
printf("Create receive thread error!
");
/* Second, decode the audio stream */
t2 = pthread_create(&tdec, NULL, decode, NULL);
if(t2 != 0)
printf("Create decode thread error!
");
/* Third, playback the audio stream */
t3 = pthread_create(&tplay, NULL, play, NULL);
if(t3 != 0)
printf("Create play thread error!
");
/* waitting for all the threads quit */
pthread_join(trecv, NULL);
pthread_join(tdec, NULL);
pthread_join(tplay, NULL);
free(recv_buf);
free(audio_buf);
return 0;
}//end of main()